This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 2683b06cb Remove JsonEqual (#2317)
2683b06cb is described below
commit 2683b06cbc213327b464f6458385877c9a7666b0
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri Aug 5 01:00:10 2022 -0700
Remove JsonEqual (#2317)
* Remove JsonEqual
* Fix clippy
* Fix test
* Parse schema metadata from JSON
---
arrow/src/array/array.rs | 3 +-
arrow/src/array/equal_json.rs | 1170 ------------------
arrow/src/array/mod.rs | 5 -
arrow/src/ipc/reader.rs | 8 +-
arrow/src/ipc/writer.rs | 8 +-
arrow/src/temporal_conversions.rs | 2 +-
arrow/src/util/integration_util.rs | 1259 +++++++++++++-------
.../src/bin/arrow-json-integration-test.rs | 5 +-
integration-testing/src/lib.rs | 732 +-----------
parquet/src/arrow/arrow_reader.rs | 60 +-
10 files changed, 815 insertions(+), 2437 deletions(-)
diff --git a/arrow/src/array/array.rs b/arrow/src/array/array.rs
index 844c68d13..9766f857c 100644
--- a/arrow/src/array/array.rs
+++ b/arrow/src/array/array.rs
@@ -21,12 +21,11 @@ use std::fmt;
use std::sync::Arc;
use super::*;
-use crate::array::equal_json::JsonEqual;
use crate::buffer::{Buffer, MutableBuffer};
/// Trait for dealing with different types of array at runtime when the type
of the
/// array is not known in advance.
-pub trait Array: fmt::Debug + Send + Sync + JsonEqual {
+pub trait Array: fmt::Debug + Send + Sync {
/// Returns the array as [`Any`](std::any::Any) so that it can be
/// downcasted to a specific implementation.
///
diff --git a/arrow/src/array/equal_json.rs b/arrow/src/array/equal_json.rs
deleted file mode 100644
index e7d14aae8..000000000
--- a/arrow/src/array/equal_json.rs
+++ /dev/null
@@ -1,1170 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use super::*;
-use crate::array::BasicDecimalArray;
-use crate::datatypes::*;
-use crate::util::decimal::BasicDecimal;
-use array::Array;
-use hex::FromHex;
-use serde_json::value::Value::{Null as JNull, Object, String as JString};
-use serde_json::Value;
-
-/// Trait for comparing arrow array with json array
-pub trait JsonEqual {
- /// Checks whether arrow array equals to json array.
- fn equals_json(&self, json: &[&Value]) -> bool;
-
- /// Checks whether arrow array equals to json array.
- fn equals_json_values(&self, json: &[Value]) -> bool {
- let refs = json.iter().collect::<Vec<&Value>>();
-
- self.equals_json(&refs)
- }
-}
-
-impl<'a, T: JsonEqual> JsonEqual for &'a T {
- fn equals_json(&self, json: &[&Value]) -> bool {
- T::equals_json(self, json)
- }
-
- fn equals_json_values(&self, json: &[Value]) -> bool {
- T::equals_json_values(self, json)
- }
-}
-
-/// Implement array equals for numeric type
-impl<T: ArrowPrimitiveType> JsonEqual for PrimitiveArray<T> {
- fn equals_json(&self, json: &[&Value]) -> bool {
- self.len() == json.len()
- && (0..self.len()).all(|i| match json[i] {
- Value::Null => self.is_null(i),
- v => {
- self.is_valid(i)
- && Some(v) == self.value(i).into_json_value().as_ref()
- }
- })
- }
-}
-
-/// Implement array equals for numeric type
-impl JsonEqual for BooleanArray {
- fn equals_json(&self, json: &[&Value]) -> bool {
- self.len() == json.len()
- && (0..self.len()).all(|i| match json[i] {
- Value::Null => self.is_null(i),
- v => {
- self.is_valid(i)
- && Some(v) == self.value(i).into_json_value().as_ref()
- }
- })
- }
-}
-
-impl<T: ArrowPrimitiveType> PartialEq<Value> for PrimitiveArray<T> {
- fn eq(&self, json: &Value) -> bool {
- match json {
- Value::Array(array) => self.equals_json_values(array),
- _ => false,
- }
- }
-}
-
-impl<T: ArrowPrimitiveType> PartialEq<PrimitiveArray<T>> for Value {
- fn eq(&self, arrow: &PrimitiveArray<T>) -> bool {
- match self {
- Value::Array(array) => arrow.equals_json_values(array),
- _ => false,
- }
- }
-}
-
-impl<OffsetSize: OffsetSizeTrait> JsonEqual for GenericListArray<OffsetSize> {
- fn equals_json(&self, json: &[&Value]) -> bool {
- if self.len() != json.len() {
- return false;
- }
-
- (0..self.len()).all(|i| match json[i] {
- Value::Array(v) => self.is_valid(i) &&
self.value(i).equals_json_values(v),
- Value::Null => self.is_null(i) || self.value_length(i).is_zero(),
- _ => false,
- })
- }
-}
-
-impl<OffsetSize: OffsetSizeTrait> PartialEq<Value> for
GenericListArray<OffsetSize> {
- fn eq(&self, json: &Value) -> bool {
- match json {
- Value::Array(json_array) => self.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl<OffsetSize: OffsetSizeTrait> PartialEq<GenericListArray<OffsetSize>> for
Value {
- fn eq(&self, arrow: &GenericListArray<OffsetSize>) -> bool {
- match self {
- Value::Array(json_array) => arrow.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl<T: ArrowPrimitiveType> JsonEqual for DictionaryArray<T> {
- fn equals_json(&self, json: &[&Value]) -> bool {
- // todo: this is wrong: we must test the values also
- self.keys().equals_json(json)
- }
-}
-
-impl<T: ArrowPrimitiveType> PartialEq<Value> for DictionaryArray<T> {
- fn eq(&self, json: &Value) -> bool {
- match json {
- Value::Array(json_array) => self.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl<T: ArrowPrimitiveType> PartialEq<DictionaryArray<T>> for Value {
- fn eq(&self, arrow: &DictionaryArray<T>) -> bool {
- match self {
- Value::Array(json_array) => arrow.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl JsonEqual for FixedSizeListArray {
- fn equals_json(&self, json: &[&Value]) -> bool {
- if self.len() != json.len() {
- return false;
- }
-
- (0..self.len()).all(|i| match json[i] {
- Value::Array(v) => self.is_valid(i) &&
self.value(i).equals_json_values(v),
- Value::Null => self.is_null(i) || self.value_length() == 0,
- _ => false,
- })
- }
-}
-
-impl PartialEq<Value> for FixedSizeListArray {
- fn eq(&self, json: &Value) -> bool {
- match json {
- Value::Array(json_array) => self.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl PartialEq<FixedSizeListArray> for Value {
- fn eq(&self, arrow: &FixedSizeListArray) -> bool {
- match self {
- Value::Array(json_array) => arrow.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl JsonEqual for StructArray {
- fn equals_json(&self, json: &[&Value]) -> bool {
- if self.len() != json.len() {
- return false;
- }
-
- let all_object = json.iter().all(|v| matches!(v, Object(_) | JNull));
-
- if !all_object {
- return false;
- }
-
- for column_name in self.column_names() {
- let json_values = json
- .iter()
- .map(|obj| obj.get(column_name).unwrap_or(&Value::Null))
- .collect::<Vec<&Value>>();
-
- if !self
- .column_by_name(column_name)
- .map(|arr| arr.equals_json(&json_values))
- .unwrap_or(false)
- {
- return false;
- }
- }
-
- true
- }
-}
-
-impl PartialEq<Value> for StructArray {
- fn eq(&self, json: &Value) -> bool {
- match json {
- Value::Array(json_array) => self.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl PartialEq<StructArray> for Value {
- fn eq(&self, arrow: &StructArray) -> bool {
- match self {
- Value::Array(json_array) => arrow.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl JsonEqual for MapArray {
- fn equals_json(&self, json: &[&Value]) -> bool {
- if self.len() != json.len() {
- return false;
- }
-
- (0..self.len()).all(|i| match json[i] {
- Value::Array(v) => self.is_valid(i) &&
self.value(i).equals_json_values(v),
- Value::Null => self.is_null(i) || self.value_length(i).eq(&0),
- _ => false,
- })
- }
-}
-
-impl PartialEq<Value> for MapArray {
- fn eq(&self, json: &Value) -> bool {
- match json {
- Value::Array(json_array) => self.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl PartialEq<MapArray> for Value {
- fn eq(&self, arrow: &MapArray) -> bool {
- match self {
- Value::Array(json_array) => arrow.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl<OffsetSize: OffsetSizeTrait> JsonEqual for GenericBinaryArray<OffsetSize>
{
- fn equals_json(&self, json: &[&Value]) -> bool {
- if self.len() != json.len() {
- return false;
- }
-
- (0..self.len()).all(|i| match json[i] {
- JString(s) => {
- // binary data is sometimes hex encoded, this checks if bytes
are equal,
- // and if not converting to hex is attempted
- self.is_valid(i)
- && (s.as_str().as_bytes() == self.value(i)
- || Vec::from_hex(s.as_str()) ==
Ok(self.value(i).to_vec()))
- }
- JNull => self.is_null(i),
- _ => false,
- })
- }
-}
-
-impl<OffsetSize: OffsetSizeTrait> PartialEq<Value> for
GenericBinaryArray<OffsetSize> {
- fn eq(&self, json: &Value) -> bool {
- match json {
- Value::Array(json_array) => self.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl<OffsetSize: OffsetSizeTrait> PartialEq<GenericBinaryArray<OffsetSize>>
for Value {
- fn eq(&self, arrow: &GenericBinaryArray<OffsetSize>) -> bool {
- match self {
- Value::Array(json_array) => arrow.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl<OffsetSize: OffsetSizeTrait> JsonEqual for GenericStringArray<OffsetSize>
{
- fn equals_json(&self, json: &[&Value]) -> bool {
- if self.len() != json.len() {
- return false;
- }
-
- (0..self.len()).all(|i| match json[i] {
- JString(s) => self.is_valid(i) && s.as_str() == self.value(i),
- JNull => self.is_null(i),
- _ => false,
- })
- }
-}
-
-impl<OffsetSize: OffsetSizeTrait> PartialEq<Value> for
GenericStringArray<OffsetSize> {
- fn eq(&self, json: &Value) -> bool {
- match json {
- Value::Array(json_array) => self.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl<OffsetSize: OffsetSizeTrait> PartialEq<GenericStringArray<OffsetSize>>
for Value {
- fn eq(&self, arrow: &GenericStringArray<OffsetSize>) -> bool {
- match self {
- Value::Array(json_array) => arrow.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl JsonEqual for FixedSizeBinaryArray {
- fn equals_json(&self, json: &[&Value]) -> bool {
- if self.len() != json.len() {
- return false;
- }
-
- (0..self.len()).all(|i| match json[i] {
- JString(s) => {
- // binary data is sometimes hex encoded, this checks if bytes
are equal,
- // and if not converting to hex is attempted
- self.is_valid(i)
- && (s.as_str().as_bytes() == self.value(i)
- || Vec::from_hex(s.as_str()) ==
Ok(self.value(i).to_vec()))
- }
- JNull => self.is_null(i),
- _ => false,
- })
- }
-}
-
-impl PartialEq<Value> for FixedSizeBinaryArray {
- fn eq(&self, json: &Value) -> bool {
- match json {
- Value::Array(json_array) => self.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl PartialEq<FixedSizeBinaryArray> for Value {
- fn eq(&self, arrow: &FixedSizeBinaryArray) -> bool {
- match self {
- Value::Array(json_array) => arrow.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl JsonEqual for Decimal128Array {
- fn equals_json(&self, json: &[&Value]) -> bool {
- if self.len() != json.len() {
- return false;
- }
-
- (0..self.len()).all(|i| match json[i] {
- JString(s) => {
- self.is_valid(i)
- && (s
- .parse::<i128>()
- .map_or_else(|_| false, |v| v ==
self.value(i).as_i128()))
- }
- JNull => self.is_null(i),
- _ => false,
- })
- }
-}
-
-impl JsonEqual for Decimal256Array {
- fn equals_json(&self, json: &[&Value]) -> bool {
- if self.len() != json.len() {
- return false;
- }
-
- (0..self.len()).all(|i| match json[i] {
- JString(s) => self.is_valid(i) && (s ==
&self.value(i).to_string()),
- JNull => self.is_null(i),
- _ => false,
- })
- }
-}
-
-impl PartialEq<Value> for Decimal128Array {
- fn eq(&self, json: &Value) -> bool {
- match json {
- Value::Array(json_array) => self.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl PartialEq<Decimal128Array> for Value {
- fn eq(&self, arrow: &Decimal128Array) -> bool {
- match self {
- Value::Array(json_array) => arrow.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl JsonEqual for UnionArray {
- fn equals_json(&self, _json: &[&Value]) -> bool {
- unimplemented!(
- "Added to allow UnionArray to implement the Array trait: see
ARROW-8547"
- )
- }
-}
-
-impl JsonEqual for NullArray {
- fn equals_json(&self, json: &[&Value]) -> bool {
- if self.len() != json.len() {
- return false;
- }
-
- // all JSON values must be nulls
- json.iter().all(|&v| v == &JNull)
- }
-}
-
-impl PartialEq<NullArray> for Value {
- fn eq(&self, arrow: &NullArray) -> bool {
- match self {
- Value::Array(json_array) => arrow.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl PartialEq<Value> for NullArray {
- fn eq(&self, json: &Value) -> bool {
- match json {
- Value::Array(json_array) => self.equals_json_values(json_array),
- _ => false,
- }
- }
-}
-
-impl JsonEqual for ArrayRef {
- fn equals_json(&self, json: &[&Value]) -> bool {
- self.as_ref().equals_json(json)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- use crate::error::Result;
- use std::{convert::TryFrom, sync::Arc};
-
- fn create_list_array<U: AsRef<[i32]>, T: AsRef<[Option<U>]>>(
- builder: &mut ListBuilder<Int32Builder>,
- data: T,
- ) -> Result<ListArray> {
- for d in data.as_ref() {
- if let Some(v) = d {
- builder.values().append_slice(v.as_ref());
- builder.append(true);
- } else {
- builder.append(false);
- }
- }
- Ok(builder.finish())
- }
-
- /// Create a fixed size list of 2 value lengths
- fn create_fixed_size_list_array<U: AsRef<[i32]>, T: AsRef<[Option<U>]>>(
- builder: &mut FixedSizeListBuilder<Int32Builder>,
- data: T,
- ) -> Result<FixedSizeListArray> {
- for d in data.as_ref() {
- if let Some(v) = d {
- builder.values().append_slice(v.as_ref());
- builder.append(true);
- } else {
- for _ in 0..builder.value_length() {
- builder.values().append_null();
- }
- builder.append(false);
- }
- }
- Ok(builder.finish())
- }
-
- #[test]
- fn test_primitive_json_equal() {
- // Test equaled array
- let arrow_array = Int32Array::from(vec![Some(1), None, Some(2),
Some(3)]);
- let json_array: Value = serde_json::from_str(
- r#"
- [
- 1, null, 2, 3
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.eq(&json_array));
- assert!(json_array.eq(&arrow_array));
-
- // Test unequaled array
- let arrow_array = Int32Array::from(vec![Some(1), None, Some(2),
Some(3)]);
- let json_array: Value = serde_json::from_str(
- r#"
- [
- 1, 1, 2, 3
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
-
- // Test unequal length case
- let arrow_array = Int32Array::from(vec![Some(1), None, Some(2),
Some(3)]);
- let json_array: Value = serde_json::from_str(
- r#"
- [
- 1, 1
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
-
- // Test not json array type case
- let arrow_array = Int32Array::from(vec![Some(1), None, Some(2),
Some(3)]);
- let json_array: Value = serde_json::from_str(
- r#"
- {
- "a": 1
- }
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
- }
-
- #[test]
- fn test_list_json_equal() {
- // Test equal case
- let arrow_array = create_list_array(
- &mut ListBuilder::new(Int32Builder::new(10)),
- &[Some(&[1, 2, 3]), None, Some(&[4, 5, 6])],
- )
- .unwrap();
- let json_array: Value = serde_json::from_str(
- r#"
- [
- [1, 2, 3],
- null,
- [4, 5, 6]
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.eq(&json_array));
- assert!(json_array.eq(&arrow_array));
-
- // Test unequal case
- let arrow_array = create_list_array(
- &mut ListBuilder::new(Int32Builder::new(10)),
- &[Some(&[1, 2, 3]), None, Some(&[4, 5, 6])],
- )
- .unwrap();
- let json_array: Value = serde_json::from_str(
- r#"
- [
- [1, 2, 3],
- [7, 8],
- [4, 5, 6]
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
-
- // Test incorrect type case
- let arrow_array = create_list_array(
- &mut ListBuilder::new(Int32Builder::new(10)),
- &[Some(&[1, 2, 3]), None, Some(&[4, 5, 6])],
- )
- .unwrap();
- let json_array: Value = serde_json::from_str(
- r#"
- {
- "a": 1
- }
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
- }
-
- #[test]
- fn test_fixed_size_list_json_equal() {
- // Test equal case
- let arrow_array = create_fixed_size_list_array(
- &mut FixedSizeListBuilder::new(Int32Builder::new(10), 3),
- &[Some(&[1, 2, 3]), None, Some(&[4, 5, 6])],
- )
- .unwrap();
- let json_array: Value = serde_json::from_str(
- r#"
- [
- [1, 2, 3],
- null,
- [4, 5, 6]
- ]
- "#,
- )
- .unwrap();
- println!("{:?}", arrow_array);
- println!("{:?}", json_array);
- assert!(arrow_array.eq(&json_array));
- assert!(json_array.eq(&arrow_array));
-
- // Test unequal case
- let arrow_array = create_fixed_size_list_array(
- &mut FixedSizeListBuilder::new(Int32Builder::new(10), 3),
- &[Some(&[1, 2, 3]), None, Some(&[4, 5, 6])],
- )
- .unwrap();
- let json_array: Value = serde_json::from_str(
- r#"
- [
- [1, 2, 3],
- [7, 8, 9],
- [4, 5, 6]
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
-
- // Test incorrect type case
- let arrow_array = create_fixed_size_list_array(
- &mut FixedSizeListBuilder::new(Int32Builder::new(10), 3),
- &[Some(&[1, 2, 3]), None, Some(&[4, 5, 6])],
- )
- .unwrap();
- let json_array: Value = serde_json::from_str(
- r#"
- {
- "a": 1
- }
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
- }
-
- #[test]
- fn test_string_json_equal() {
- // Test the equal case
- let arrow_array =
- StringArray::from(vec![Some("hello"), None, None, Some("world"),
None, None]);
- let json_array: Value = serde_json::from_str(
- r#"
- [
- "hello",
- null,
- null,
- "world",
- null,
- null
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.eq(&json_array));
- assert!(json_array.eq(&arrow_array));
-
- // Test unequal case
- let arrow_array =
- StringArray::from(vec![Some("hello"), None, None, Some("world"),
None, None]);
- let json_array: Value = serde_json::from_str(
- r#"
- [
- "hello",
- null,
- null,
- "arrow",
- null,
- null
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
-
- // Test unequal length case
- let arrow_array =
- StringArray::from(vec![Some("hello"), None, None, Some("world"),
None]);
- let json_array: Value = serde_json::from_str(
- r#"
- [
- "hello",
- null,
- null,
- "arrow",
- null,
- null
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
-
- // Test incorrect type case
- let arrow_array =
- StringArray::from(vec![Some("hello"), None, None, Some("world"),
None]);
- let json_array: Value = serde_json::from_str(
- r#"
- {
- "a": 1
- }
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
-
- // Test incorrect value type case
- let arrow_array =
- StringArray::from(vec![Some("hello"), None, None, Some("world"),
None]);
- let json_array: Value = serde_json::from_str(
- r#"
- [
- "hello",
- null,
- null,
- 1,
- null,
- null
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
- }
-
- #[test]
- fn test_binary_json_equal() {
- // Test the equal case
- let mut builder = BinaryBuilder::new(6);
- builder.append_value(b"hello");
- builder.append_null();
- builder.append_null();
- builder.append_value(b"world");
- builder.append_null();
- builder.append_null();
- let arrow_array = builder.finish();
- let json_array: Value = serde_json::from_str(
- r#"
- [
- "hello",
- null,
- null,
- "world",
- null,
- null
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.eq(&json_array));
- assert!(json_array.eq(&arrow_array));
-
- // Test unequal case
- let arrow_array =
- StringArray::from(vec![Some("hello"), None, None, Some("world"),
None, None]);
- let json_array: Value = serde_json::from_str(
- r#"
- [
- "hello",
- null,
- null,
- "arrow",
- null,
- null
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
-
- // Test unequal length case
- let arrow_array =
- StringArray::from(vec![Some("hello"), None, None, Some("world"),
None]);
- let json_array: Value = serde_json::from_str(
- r#"
- [
- "hello",
- null,
- null,
- "arrow",
- null,
- null
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
-
- // Test incorrect type case
- let arrow_array =
- StringArray::from(vec![Some("hello"), None, None, Some("world"),
None]);
- let json_array: Value = serde_json::from_str(
- r#"
- {
- "a": 1
- }
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
-
- // Test incorrect value type case
- let arrow_array =
- StringArray::from(vec![Some("hello"), None, None, Some("world"),
None]);
- let json_array: Value = serde_json::from_str(
- r#"
- [
- "hello",
- null,
- null,
- 1,
- null,
- null
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
- }
-
- #[test]
- fn test_fixed_size_binary_json_equal() {
- // Test the equal case
- let mut builder = FixedSizeBinaryBuilder::new(15, 5);
- builder.append_value(b"hello").unwrap();
- builder.append_null();
- builder.append_value(b"world").unwrap();
- let arrow_array: FixedSizeBinaryArray = builder.finish();
- let json_array: Value = serde_json::from_str(
- r#"
- [
- "hello",
- null,
- "world"
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.eq(&json_array));
- assert!(json_array.eq(&arrow_array));
-
- // Test unequal case
- builder.append_value(b"hello").unwrap();
- builder.append_null();
- builder.append_value(b"world").unwrap();
- let arrow_array: FixedSizeBinaryArray = builder.finish();
- let json_array: Value = serde_json::from_str(
- r#"
- [
- "hello",
- null,
- "arrow"
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
-
- // Test unequal length case
- let json_array: Value = serde_json::from_str(
- r#"
- [
- "hello",
- null,
- null,
- "world"
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
-
- // Test incorrect type case
- let json_array: Value = serde_json::from_str(
- r#"
- {
- "a": 1
- }
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
-
- // Test incorrect value type case
- let json_array: Value = serde_json::from_str(
- r#"
- [
- "hello",
- null,
- 1
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
- }
-
- #[test]
- fn test_decimal_json_equal() {
- // Test the equal case
- let arrow_array = [Some(1_000), None, Some(-250)]
- .iter()
- .collect::<Decimal128Array>()
- .with_precision_and_scale(23, 6)
- .unwrap();
- let json_array: Value = serde_json::from_str(
- r#"
- [
- "1000",
- null,
- "-250"
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.eq(&json_array));
- assert!(json_array.eq(&arrow_array));
-
- // Test unequal case
- let arrow_array = [Some(1_000), None, Some(55)]
- .iter()
- .collect::<Decimal128Array>()
- .with_precision_and_scale(23, 6)
- .unwrap();
- let json_array: Value = serde_json::from_str(
- r#"
- [
- "1000",
- null,
- "-250"
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
-
- // Test unequal length case
- let json_array: Value = serde_json::from_str(
- r#"
- [
- "1000",
- null,
- null,
- "55"
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
-
- // Test incorrect type case
- let json_array: Value = serde_json::from_str(
- r#"
- {
- "a": 1
- }
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
-
- // Test incorrect value type case
- let json_array: Value = serde_json::from_str(
- r#"
- [
- "hello",
- null,
- 1
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
- }
-
- #[test]
- fn test_struct_json_equal() {
- let strings: ArrayRef = Arc::new(StringArray::from(vec![
- Some("joe"),
- None,
- None,
- Some("mark"),
- Some("doe"),
- ]));
- let ints: ArrayRef = Arc::new(Int32Array::from(vec![
- Some(1),
- Some(2),
- None,
- Some(4),
- Some(5),
- ]));
-
- let arrow_array =
- StructArray::try_from(vec![("f1", strings.clone()), ("f2",
ints.clone())])
- .unwrap();
-
- let json_array: Value = serde_json::from_str(
- r#"
- [
- {
- "f1": "joe",
- "f2": 1
- },
- {
- "f2": 2
- },
- null,
- {
- "f1": "mark",
- "f2": 4
- },
- {
- "f1": "doe",
- "f2": 5
- }
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.eq(&json_array));
- assert!(json_array.eq(&arrow_array));
-
- // Test unequal length case
- let json_array: Value = serde_json::from_str(
- r#"
- [
- {
- "f1": "joe",
- "f2": 1
- },
- {
- "f2": 2
- },
- null,
- {
- "f1": "mark",
- "f2": 4
- }
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
-
- // Test incorrect type case
- let json_array: Value = serde_json::from_str(
- r#"
- {
- "f1": "joe",
- "f2": 1
- }
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
-
- // Test not all object case
- let json_array: Value = serde_json::from_str(
- r#"
- [
- {
- "f1": "joe",
- "f2": 1
- },
- 2,
- null,
- {
- "f1": "mark",
- "f2": 4
- }
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
- }
-
- #[test]
- fn test_null_json_equal() {
- // Test equaled array
- let arrow_array = NullArray::new(4);
- let json_array: Value = serde_json::from_str(
- r#"
- [
- null, null, null, null
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.eq(&json_array));
- assert!(json_array.eq(&arrow_array));
-
- // Test unequaled array
- let arrow_array = NullArray::new(2);
- let json_array: Value = serde_json::from_str(
- r#"
- [
- null, null, null
- ]
- "#,
- )
- .unwrap();
- assert!(arrow_array.ne(&json_array));
- assert!(json_array.ne(&arrow_array));
- }
-}
diff --git a/arrow/src/array/mod.rs b/arrow/src/array/mod.rs
index c49a3a527..3785af85a 100644
--- a/arrow/src/array/mod.rs
+++ b/arrow/src/array/mod.rs
@@ -175,7 +175,6 @@ mod builder;
mod cast;
mod data;
mod equal;
-mod equal_json;
#[cfg(feature = "ffi")]
mod ffi;
mod iterator;
@@ -597,10 +596,6 @@ pub use self::transform::{Capacities, MutableArrayData};
pub use self::iterator::*;
-// --------------------- Array Equality ---------------------
-
-pub use self::equal_json::JsonEqual;
-
// --------------------- Array's values comparison ---------------------
pub use self::ord::{build_compare, DynComparator};
diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs
index f3af214ce..ce44d74a1 100644
--- a/arrow/src/ipc/reader.rs
+++ b/arrow/src/ipc/reader.rs
@@ -1215,7 +1215,7 @@ mod tests {
// read expected JSON output
let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader));
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
});
}
@@ -1336,7 +1336,7 @@ mod tests {
// read expected JSON output
let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader));
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
// the next batch must be empty
assert!(reader.next().is_none());
// the stream must indicate that it's finished
@@ -1373,7 +1373,7 @@ mod tests {
// read expected JSON output
let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader));
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
});
}
@@ -1406,7 +1406,7 @@ mod tests {
// read expected JSON output
let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader));
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
// the next batch must be empty
assert!(reader.next().is_none());
// the stream must indicate that it's finished
diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs
index 374f9fad1..f0942b074 100644
--- a/arrow/src/ipc/writer.rs
+++ b/arrow/src/ipc/writer.rs
@@ -1319,7 +1319,7 @@ mod tests {
// read expected JSON output
let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader));
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
});
}
@@ -1370,7 +1370,7 @@ mod tests {
// read expected JSON output
let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader));
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
});
}
@@ -1434,7 +1434,7 @@ mod tests {
// read expected JSON output
let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader));
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
});
}
@@ -1495,7 +1495,7 @@ mod tests {
// read expected JSON output
let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader));
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
});
}
diff --git a/arrow/src/temporal_conversions.rs
b/arrow/src/temporal_conversions.rs
index fda004a6d..3b1fcdc84 100644
--- a/arrow/src/temporal_conversions.rs
+++ b/arrow/src/temporal_conversions.rs
@@ -127,7 +127,7 @@ pub fn timestamp_ns_to_datetime(v: i64) -> NaiveDateTime {
// extract seconds from nanoseconds
v / NANOSECONDS,
// discard extracted seconds
- (v % NANOSECONDS) as u32,
+ if v > 0 { (v % NANOSECONDS) as u32 } else { 0 },
)
}
diff --git a/arrow/src/util/integration_util.rs
b/arrow/src/util/integration_util.rs
index 0077b2fb7..65d54a02b 100644
--- a/arrow/src/util/integration_util.rs
+++ b/arrow/src/util/integration_util.rs
@@ -19,13 +19,22 @@
//!
//! These utilities define structs that read the integration JSON format for
integration testing purposes.
+use hex::decode;
+use num::BigInt;
+use num::Signed;
use serde_derive::{Deserialize, Serialize};
-use serde_json::{Map as SJMap, Number as VNumber, Value};
+use serde_json::{Map as SJMap, Value};
+use std::collections::HashMap;
+use std::sync::Arc;
use crate::array::*;
+use crate::buffer::{Buffer, MutableBuffer};
+use crate::compute;
use crate::datatypes::*;
-use crate::error::Result;
+use crate::error::{ArrowError, Result};
use crate::record_batch::{RecordBatch, RecordBatchReader};
+use crate::util::bit_util;
+use crate::util::decimal::{BasicDecimal, Decimal256};
/// A struct that represents an Arrow file with a schema and record batches
#[derive(Deserialize, Serialize, Debug)]
@@ -42,6 +51,8 @@ pub struct ArrowJson {
#[derive(Deserialize, Serialize, Debug)]
pub struct ArrowJsonSchema {
pub fields: Vec<ArrowJsonField>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub metadata: Option<Vec<HashMap<String, String>>>,
}
/// Fields are left as JSON `Value` as they vary by `DataType`
@@ -107,14 +118,14 @@ pub struct DictionaryIndexType {
}
/// A struct that partially reads the Arrow JSON record batch
-#[derive(Deserialize, Serialize, Debug)]
+#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct ArrowJsonBatch {
count: usize,
pub columns: Vec<ArrowJsonColumn>,
}
/// A struct that partially reads the Arrow JSON dictionary batch
-#[derive(Deserialize, Serialize, Debug)]
+#[derive(Deserialize, Serialize, Debug, Clone)]
#[allow(non_snake_case)]
pub struct ArrowJsonDictionaryBatch {
pub id: i64,
@@ -139,17 +150,45 @@ pub struct ArrowJsonColumn {
impl ArrowJson {
/// Compare the Arrow JSON with a record batch reader
- pub fn equals_reader(&self, reader: &mut dyn RecordBatchReader) -> bool {
+ pub fn equals_reader(&self, reader: &mut dyn RecordBatchReader) ->
Result<bool> {
if !self.schema.equals_schema(&reader.schema()) {
- return false;
+ return Ok(false);
}
- self.batches.iter().all(|col| {
+
+ for json_batch in self.get_record_batches()?.into_iter() {
let batch = reader.next();
match batch {
- Some(Ok(batch)) => col.equals_batch(&batch),
- _ => false,
+ Some(Ok(batch)) => {
+ if json_batch != batch {
+ println!("json: {:?}", json_batch);
+ println!("batch: {:?}", batch);
+ return Ok(false);
+ }
+ }
+ _ => return Ok(false),
}
- })
+ }
+
+ Ok(true)
+ }
+
+ pub fn get_record_batches(&self) -> Result<Vec<RecordBatch>> {
+ let schema = self.schema.to_arrow_schema()?;
+
+ let mut dictionaries = HashMap::new();
+ self.dictionaries.iter().for_each(|dict_batches| {
+ dict_batches.iter().for_each(|d| {
+ dictionaries.insert(d.id, d.clone());
+ });
+ });
+
+ let batches: Result<Vec<_>> = self
+ .batches
+ .iter()
+ .map(|col| record_batch_from_json(&schema, col.clone(),
Some(&dictionaries)))
+ .collect();
+
+ batches
}
}
@@ -169,6 +208,28 @@ impl ArrowJsonSchema {
}
true
}
+
+ fn to_arrow_schema(&self) -> Result<Schema> {
+ let arrow_fields: Result<Vec<_>> = self
+ .fields
+ .iter()
+ .map(|field| field.to_arrow_field())
+ .collect();
+
+ if let Some(metadatas) = &self.metadata {
+ let mut metadata: HashMap<String, String> = HashMap::new();
+
+ metadatas.iter().for_each(|pair| {
+ let key = pair.get("key").unwrap();
+ let value = pair.get("value").unwrap();
+ metadata.insert(key.clone(), value.clone());
+ });
+
+ Ok(Schema::new_with_metadata(arrow_fields?, metadata))
+ } else {
+ Ok(Schema::new(arrow_fields?))
+ }
+ }
}
impl ArrowJsonField {
@@ -199,251 +260,731 @@ impl ArrowJsonField {
}
}
-impl ArrowJsonBatch {
- /// Compare the Arrow JSON record batch with a `RecordBatch`
- fn equals_batch(&self, batch: &RecordBatch) -> bool {
- if self.count != batch.num_rows() {
- return false;
+pub fn record_batch_from_json(
+ schema: &Schema,
+ json_batch: ArrowJsonBatch,
+ json_dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
+) -> Result<RecordBatch> {
+ let mut columns = vec![];
+
+ for (field, json_col) in schema.fields().iter().zip(json_batch.columns) {
+ let col = array_from_json(field, json_col, json_dictionaries)?;
+ columns.push(col);
+ }
+
+ RecordBatch::try_new(Arc::new(schema.clone()), columns)
+}
+
+/// Construct an Arrow array from a partially typed JSON column
+pub fn array_from_json(
+ field: &Field,
+ json_col: ArrowJsonColumn,
+ dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
+) -> Result<ArrayRef> {
+ match field.data_type() {
+ DataType::Null => Ok(Arc::new(NullArray::new(json_col.count))),
+ DataType::Boolean => {
+ let mut b = BooleanBuilder::new(json_col.count);
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => b.append_value(value.as_bool().unwrap()),
+ _ => b.append_null(),
+ };
+ }
+ Ok(Arc::new(b.finish()))
}
- let num_columns = self.columns.len();
- if num_columns != batch.num_columns() {
- return false;
+ DataType::Int8 => {
+ let mut b = Int8Builder::new(json_col.count);
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => b.append_value(value.as_i64().ok_or_else(|| {
+ ArrowError::JsonError(format!(
+ "Unable to get {:?} as int64",
+ value
+ ))
+ })? as i8),
+ _ => b.append_null(),
+ };
+ }
+ Ok(Arc::new(b.finish()))
}
- let schema = batch.schema();
- self.columns
- .iter()
- .zip(batch.columns())
- .zip(schema.fields())
- .all(|((col, arr), field)| {
- // compare each column based on its type
- if &col.name != field.name() {
- return false;
- }
- let json_array: Vec<Value> = json_from_col(col,
field.data_type());
- match field.data_type() {
- DataType::Null => {
- let arr: &NullArray =
- arr.as_any().downcast_ref::<NullArray>().unwrap();
- // NullArrays should have the same length, json_array
is empty
- arr.len() == col.count
- }
- DataType::Boolean => {
- let arr =
arr.as_any().downcast_ref::<BooleanArray>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::Int8 => {
- let arr =
arr.as_any().downcast_ref::<Int8Array>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::Int16 => {
- let arr =
arr.as_any().downcast_ref::<Int16Array>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::Int32 | DataType::Date32 | DataType::Time32(_)
=> {
- let arr = Int32Array::from(arr.data().clone());
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::Int64
- | DataType::Date64
- | DataType::Time64(_)
- | DataType::Timestamp(_, _)
- | DataType::Duration(_) => {
- let arr = Int64Array::from(arr.data().clone());
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::Interval(IntervalUnit::YearMonth) => {
- let arr =
IntervalYearMonthArray::from(arr.data().clone());
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::Interval(IntervalUnit::DayTime) => {
- let arr =
IntervalDayTimeArray::from(arr.data().clone());
- let x = json_array
- .iter()
- .map(|v| {
- match v {
- Value::Null => Value::Null,
- Value::Object(v) => {
- // interval has days and milliseconds
- let days: i32 =
-
v.get("days").unwrap().as_i64().unwrap()
- as i32;
- let milliseconds: i32 = v
- .get("milliseconds")
- .unwrap()
- .as_i64()
- .unwrap()
- as i32;
- let value: i64 = unsafe {
- std::mem::transmute::<[i32; 2],
i64>([
- days,
- milliseconds,
- ])
- };
- Value::Number(VNumber::from(value))
+ DataType::Int16 => {
+ let mut b = Int16Builder::new(json_col.count);
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => b.append_value(value.as_i64().unwrap() as i16),
+ _ => b.append_null(),
+ };
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ DataType::Int32
+ | DataType::Date32
+ | DataType::Time32(_)
+ | DataType::Interval(IntervalUnit::YearMonth) => {
+ let mut b = Int32Builder::new(json_col.count);
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => b.append_value(value.as_i64().unwrap() as i32),
+ _ => b.append_null(),
+ };
+ }
+ let array = Arc::new(b.finish()) as ArrayRef;
+ compute::cast(&array, field.data_type())
+ }
+ DataType::Int64
+ | DataType::Date64
+ | DataType::Time64(_)
+ | DataType::Timestamp(_, _)
+ | DataType::Duration(_)
+ | DataType::Interval(IntervalUnit::DayTime) => {
+ let mut b = Int64Builder::new(json_col.count);
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => b.append_value(match value {
+ Value::Number(n) => n.as_i64().unwrap(),
+ Value::String(s) => {
+ s.parse().expect("Unable to parse string as i64")
+ }
+ Value::Object(ref map)
+ if map.contains_key("days")
+ && map.contains_key("milliseconds") =>
+ {
+ match field.data_type() {
+ DataType::Interval(IntervalUnit::DayTime) => {
+ let days = map.get("days").unwrap();
+ let milliseconds =
map.get("milliseconds").unwrap();
+
+ match (days, milliseconds) {
+ (Value::Number(d), Value::Number(m))
=> {
+ let mut bytes = [0_u8; 8];
+ let m = (m.as_i64().unwrap() as
i32)
+ .to_le_bytes();
+ let d = (d.as_i64().unwrap() as
i32)
+ .to_le_bytes();
+
+ let c = [d, m].concat();
+
bytes.copy_from_slice(c.as_slice());
+ i64::from_le_bytes(bytes)
+ }
+ _ => panic!(
+ "Unable to parse {:?} as interval
daytime",
+ value
+ ),
}
- // return null if Value is not an object
- _ => Value::Null,
}
- })
- .collect::<Vec<Value>>();
- arr.equals_json(&x.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::Interval(IntervalUnit::MonthDayNano) => {
- let arr =
IntervalMonthDayNanoArray::from(arr.data().clone());
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::UInt8 => {
- let arr =
arr.as_any().downcast_ref::<UInt8Array>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::UInt16 => {
- let arr =
arr.as_any().downcast_ref::<UInt16Array>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::UInt32 => {
- let arr =
arr.as_any().downcast_ref::<UInt32Array>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::UInt64 => {
- let arr =
arr.as_any().downcast_ref::<UInt64Array>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::Float32 => {
- let arr =
arr.as_any().downcast_ref::<Float32Array>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::Float64 => {
- let arr =
arr.as_any().downcast_ref::<Float64Array>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::Binary => {
- let arr =
arr.as_any().downcast_ref::<BinaryArray>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::LargeBinary => {
- let arr =
-
arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::FixedSizeBinary(_) => {
- let arr =
-
arr.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::Utf8 => {
- let arr =
arr.as_any().downcast_ref::<StringArray>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::LargeUtf8 => {
- let arr =
-
arr.as_any().downcast_ref::<LargeStringArray>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::List(_) => {
- let arr =
arr.as_any().downcast_ref::<ListArray>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::LargeList(_) => {
- let arr =
arr.as_any().downcast_ref::<LargeListArray>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::FixedSizeList(_, _) => {
- let arr =
-
arr.as_any().downcast_ref::<FixedSizeListArray>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::Struct(_) => {
- let arr =
arr.as_any().downcast_ref::<StructArray>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::Map(_, _) => {
- let arr =
arr.as_any().downcast_ref::<MapArray>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::Decimal128(_, _) => {
- let arr =
arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
-
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
- }
- DataType::Dictionary(ref key_type, _) => match
key_type.as_ref() {
- DataType::Int8 => {
- let arr = arr
- .as_any()
- .downcast_ref::<Int8DictionaryArray>()
- .unwrap();
- arr.equals_json(
-
&json_array.iter().collect::<Vec<&Value>>()[..],
- )
- }
- DataType::Int16 => {
- let arr = arr
- .as_any()
- .downcast_ref::<Int16DictionaryArray>()
- .unwrap();
- arr.equals_json(
-
&json_array.iter().collect::<Vec<&Value>>()[..],
- )
- }
- DataType::Int32 => {
- let arr = arr
- .as_any()
- .downcast_ref::<Int32DictionaryArray>()
- .unwrap();
- arr.equals_json(
-
&json_array.iter().collect::<Vec<&Value>>()[..],
- )
- }
- DataType::Int64 => {
- let arr = arr
- .as_any()
- .downcast_ref::<Int64DictionaryArray>()
- .unwrap();
- arr.equals_json(
-
&json_array.iter().collect::<Vec<&Value>>()[..],
- )
+ _ => panic!(
+ "Unable to parse {:?} as interval daytime",
+ value
+ ),
+ }
}
- DataType::UInt8 => {
- let arr = arr
- .as_any()
- .downcast_ref::<UInt8DictionaryArray>()
- .unwrap();
- arr.equals_json(
-
&json_array.iter().collect::<Vec<&Value>>()[..],
+ _ => panic!("Unable to parse {:?} as number", value),
+ }),
+ _ => b.append_null(),
+ };
+ }
+ let array = Arc::new(b.finish()) as ArrayRef;
+ compute::cast(&array, field.data_type())
+ }
+ DataType::UInt8 => {
+ let mut b = UInt8Builder::new(json_col.count);
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => b.append_value(value.as_u64().unwrap() as u8),
+ _ => b.append_null(),
+ };
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ DataType::UInt16 => {
+ let mut b = UInt16Builder::new(json_col.count);
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => b.append_value(value.as_u64().unwrap() as u16),
+ _ => b.append_null(),
+ };
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ DataType::UInt32 => {
+ let mut b = UInt32Builder::new(json_col.count);
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => b.append_value(value.as_u64().unwrap() as u32),
+ _ => b.append_null(),
+ };
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ DataType::UInt64 => {
+ let mut b = UInt64Builder::new(json_col.count);
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => {
+ if value.is_string() {
+ b.append_value(
+ value
+ .as_str()
+ .unwrap()
+ .parse()
+ .expect("Unable to parse string as u64"),
)
- }
- DataType::UInt16 => {
- let arr = arr
- .as_any()
- .downcast_ref::<UInt16DictionaryArray>()
- .unwrap();
- arr.equals_json(
-
&json_array.iter().collect::<Vec<&Value>>()[..],
+ } else if value.is_number() {
+ b.append_value(
+ value.as_u64().expect("Unable to read number
as u64"),
)
+ } else {
+ panic!("Unable to parse value {:?} as u64", value)
}
- DataType::UInt32 => {
- let arr = arr
- .as_any()
- .downcast_ref::<UInt32DictionaryArray>()
- .unwrap();
- arr.equals_json(
-
&json_array.iter().collect::<Vec<&Value>>()[..],
- )
+ }
+ _ => b.append_null(),
+ };
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ DataType::Interval(IntervalUnit::MonthDayNano) => {
+ let mut b = IntervalMonthDayNanoBuilder::new(json_col.count);
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => b.append_value(match value {
+ Value::Object(v) => {
+ let months = v.get("months").unwrap();
+ let days = v.get("days").unwrap();
+ let nanoseconds = v.get("nanoseconds").unwrap();
+ match (months, days, nanoseconds) {
+ (
+ Value::Number(months),
+ Value::Number(days),
+ Value::Number(nanoseconds),
+ ) => {
+ let months = months.as_i64().unwrap() as
i32;
+ let days = days.as_i64().unwrap() as i32;
+ let nanoseconds =
nanoseconds.as_i64().unwrap();
+ let months_days_ns: i128 = ((nanoseconds
as i128)
+ & 0xFFFFFFFFFFFFFFFF)
+ << 64
+ | ((days as i128) & 0xFFFFFFFF) << 32
+ | ((months as i128) & 0xFFFFFFFF);
+ months_days_ns
+ }
+ (_, _, _) => {
+ panic!("Unable to parse {:?} as
MonthDayNano", v)
+ }
+ }
}
- DataType::UInt64 => {
- let arr = arr
- .as_any()
- .downcast_ref::<UInt64DictionaryArray>()
+ _ => panic!("Unable to parse {:?} as MonthDayNano",
value),
+ }),
+ _ => b.append_null(),
+ };
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ DataType::Float32 => {
+ let mut b = Float32Builder::new(json_col.count);
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => b.append_value(value.as_f64().unwrap() as f32),
+ _ => b.append_null(),
+ };
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ DataType::Float64 => {
+ let mut b = Float64Builder::new(json_col.count);
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => b.append_value(value.as_f64().unwrap()),
+ _ => b.append_null(),
+ };
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ DataType::Binary => {
+ let mut b = BinaryBuilder::new(json_col.count);
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => {
+ let v = decode(value.as_str().unwrap()).unwrap();
+ b.append_value(&v)
+ }
+ _ => b.append_null(),
+ };
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ DataType::LargeBinary => {
+ let mut b = LargeBinaryBuilder::new(json_col.count);
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => {
+ let v = decode(value.as_str().unwrap()).unwrap();
+ b.append_value(&v)
+ }
+ _ => b.append_null(),
+ };
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ DataType::Utf8 => {
+ let mut b = StringBuilder::new(json_col.count);
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => b.append_value(value.as_str().unwrap()),
+ _ => b.append_null(),
+ };
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ DataType::LargeUtf8 => {
+ let mut b = LargeStringBuilder::new(json_col.count);
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => b.append_value(value.as_str().unwrap()),
+ _ => b.append_null(),
+ };
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ DataType::FixedSizeBinary(len) => {
+ let mut b = FixedSizeBinaryBuilder::new(json_col.count, *len);
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => {
+ let v = hex::decode(value.as_str().unwrap()).unwrap();
+ b.append_value(&v)?
+ }
+ _ => b.append_null(),
+ };
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ DataType::List(child_field) => {
+ let null_buf = create_null_buf(&json_col);
+ let children = json_col.children.clone().unwrap();
+ let child_array = array_from_json(
+ child_field,
+ children.get(0).unwrap().clone(),
+ dictionaries,
+ )?;
+ let offsets: Vec<i32> = json_col
+ .offset
+ .unwrap()
+ .iter()
+ .map(|v| v.as_i64().unwrap() as i32)
+ .collect();
+ let list_data = ArrayData::builder(field.data_type().clone())
+ .len(json_col.count)
+ .offset(0)
+ .add_buffer(Buffer::from(&offsets.to_byte_slice()))
+ .add_child_data(child_array.into_data())
+ .null_bit_buffer(Some(null_buf))
+ .build()
+ .unwrap();
+ Ok(Arc::new(ListArray::from(list_data)))
+ }
+ DataType::LargeList(child_field) => {
+ let null_buf = create_null_buf(&json_col);
+ let children = json_col.children.clone().unwrap();
+ let child_array = array_from_json(
+ child_field,
+ children.get(0).unwrap().clone(),
+ dictionaries,
+ )?;
+ let offsets: Vec<i64> = json_col
+ .offset
+ .unwrap()
+ .iter()
+ .map(|v| match v {
+ Value::Number(n) => n.as_i64().unwrap(),
+ Value::String(s) => s.parse::<i64>().unwrap(),
+ _ => panic!("64-bit offset must be either string or
number"),
+ })
+ .collect();
+ let list_data = ArrayData::builder(field.data_type().clone())
+ .len(json_col.count)
+ .offset(0)
+ .add_buffer(Buffer::from(&offsets.to_byte_slice()))
+ .add_child_data(child_array.into_data())
+ .null_bit_buffer(Some(null_buf))
+ .build()
+ .unwrap();
+ Ok(Arc::new(LargeListArray::from(list_data)))
+ }
+ DataType::FixedSizeList(child_field, _) => {
+ let children = json_col.children.clone().unwrap();
+ let child_array = array_from_json(
+ child_field,
+ children.get(0).unwrap().clone(),
+ dictionaries,
+ )?;
+ let null_buf = create_null_buf(&json_col);
+ let list_data = ArrayData::builder(field.data_type().clone())
+ .len(json_col.count)
+ .add_child_data(child_array.into_data())
+ .null_bit_buffer(Some(null_buf))
+ .build()
+ .unwrap();
+ Ok(Arc::new(FixedSizeListArray::from(list_data)))
+ }
+ DataType::Struct(fields) => {
+ // construct struct with null data
+ let null_buf = create_null_buf(&json_col);
+ let mut array_data = ArrayData::builder(field.data_type().clone())
+ .len(json_col.count)
+ .null_bit_buffer(Some(null_buf));
+
+ for (field, col) in fields.iter().zip(json_col.children.unwrap()) {
+ let array = array_from_json(field, col, dictionaries)?;
+ array_data = array_data.add_child_data(array.into_data());
+ }
+
+ let array = StructArray::from(array_data.build().unwrap());
+ Ok(Arc::new(array))
+ }
+ DataType::Dictionary(key_type, value_type) => {
+ let dict_id = field.dict_id().ok_or_else(|| {
+ ArrowError::JsonError(format!(
+ "Unable to find dict_id for field {:?}",
+ field
+ ))
+ })?;
+ // find dictionary
+ let dictionary = dictionaries
+ .ok_or_else(|| {
+ ArrowError::JsonError(format!(
+ "Unable to find any dictionaries for field {:?}",
+ field
+ ))
+ })?
+ .get(&dict_id);
+ match dictionary {
+ Some(dictionary) => dictionary_array_from_json(
+ field,
+ json_col,
+ key_type,
+ value_type,
+ dictionary,
+ dictionaries,
+ ),
+ None => Err(ArrowError::JsonError(format!(
+ "Unable to find dictionary for field {:?}",
+ field
+ ))),
+ }
+ }
+ DataType::Decimal128(precision, scale) => {
+ let mut b = Decimal128Builder::new(json_col.count, *precision,
*scale);
+ // C++ interop tests involve incompatible decimal values
+ unsafe {
+ b.disable_value_validation();
+ }
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => {
+
b.append_value(value.as_str().unwrap().parse::<i128>().unwrap())?
+ }
+ _ => b.append_null(),
+ };
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ DataType::Decimal256(precision, scale) => {
+ let mut b = Decimal256Builder::new(json_col.count, *precision,
*scale);
+ // C++ interop tests involve incompatible decimal values
+ unsafe {
+ b.disable_value_validation();
+ }
+ for (is_valid, value) in json_col
+ .validity
+ .as_ref()
+ .unwrap()
+ .iter()
+ .zip(json_col.data.unwrap())
+ {
+ match is_valid {
+ 1 => {
+ let str = value.as_str().unwrap();
+ let integer = BigInt::parse_bytes(str.as_bytes(),
10).unwrap();
+ let integer_bytes = integer.to_signed_bytes_le();
+ let mut bytes = if integer.is_positive() {
+ [0_u8; 32]
+ } else {
+ [255_u8; 32]
+ };
+ bytes[0..integer_bytes.len()]
+ .copy_from_slice(integer_bytes.as_slice());
+ let decimal =
+ Decimal256::try_new_from_bytes(*precision, *scale,
&bytes)
.unwrap();
- arr.equals_json(
-
&json_array.iter().collect::<Vec<&Value>>()[..],
- )
- }
- t => panic!("Unsupported dictionary comparison for
{:?}", t),
- },
- t => panic!("Unsupported comparison for {:?}", t),
+ b.append_value(&decimal)?;
+ }
+ _ => b.append_null(),
}
- })
+ }
+ Ok(Arc::new(b.finish()))
+ }
+ DataType::Map(child_field, _) => {
+ let null_buf = create_null_buf(&json_col);
+ let children = json_col.children.clone().unwrap();
+ let child_array = array_from_json(
+ child_field,
+ children.get(0).unwrap().clone(),
+ dictionaries,
+ )?;
+ let offsets: Vec<i32> = json_col
+ .offset
+ .unwrap()
+ .iter()
+ .map(|v| v.as_i64().unwrap() as i32)
+ .collect();
+ let array_data = ArrayData::builder(field.data_type().clone())
+ .len(json_col.count)
+ .add_buffer(Buffer::from(&offsets.to_byte_slice()))
+ .add_child_data(child_array.into_data())
+ .null_bit_buffer(Some(null_buf))
+ .build()
+ .unwrap();
+
+ let array = MapArray::from(array_data);
+ Ok(Arc::new(array))
+ }
+ DataType::Union(fields, field_type_ids, _) => {
+ let type_ids = if let Some(type_id) = json_col.type_id {
+ type_id
+ } else {
+ return Err(ArrowError::JsonError(
+ "Cannot find expected type_id in json column".to_string(),
+ ));
+ };
+
+ let offset: Option<Buffer> = json_col.offset.map(|offsets| {
+ let offsets: Vec<i32> =
+ offsets.iter().map(|v| v.as_i64().unwrap() as
i32).collect();
+ Buffer::from(&offsets.to_byte_slice())
+ });
+
+ let mut children: Vec<(Field, Arc<dyn Array>)> = vec![];
+ for (field, col) in fields.iter().zip(json_col.children.unwrap()) {
+ let array = array_from_json(field, col, dictionaries)?;
+ children.push((field.clone(), array));
+ }
+
+ let array = UnionArray::try_new(
+ field_type_ids,
+ Buffer::from(&type_ids.to_byte_slice()),
+ offset,
+ children,
+ )
+ .unwrap();
+ Ok(Arc::new(array))
+ }
+ t => Err(ArrowError::JsonError(format!(
+ "data type {:?} not supported",
+ t
+ ))),
+ }
+}
+
+pub fn dictionary_array_from_json(
+ field: &Field,
+ json_col: ArrowJsonColumn,
+ dict_key: &DataType,
+ dict_value: &DataType,
+ dictionary: &ArrowJsonDictionaryBatch,
+ dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
+) -> Result<ArrayRef> {
+ match dict_key {
+ DataType::Int8
+ | DataType::Int16
+ | DataType::Int32
+ | DataType::Int64
+ | DataType::UInt8
+ | DataType::UInt16
+ | DataType::UInt32
+ | DataType::UInt64 => {
+ let null_buf = create_null_buf(&json_col);
+
+ // build the key data into a buffer, then construct values
separately
+ let key_field = Field::new_dict(
+ "key",
+ dict_key.clone(),
+ field.is_nullable(),
+ field
+ .dict_id()
+ .expect("Dictionary fields must have a dict_id value"),
+ field
+ .dict_is_ordered()
+ .expect("Dictionary fields must have a dict_is_ordered
value"),
+ );
+ let keys = array_from_json(&key_field, json_col, None)?;
+ // note: not enough info on nullability of dictionary
+ let value_field = Field::new("value", dict_value.clone(), true);
+ let values = array_from_json(
+ &value_field,
+ dictionary.data.columns[0].clone(),
+ dictionaries,
+ )?;
+
+ // convert key and value to dictionary data
+ let dict_data = ArrayData::builder(field.data_type().clone())
+ .len(keys.len())
+ .add_buffer(keys.data().buffers()[0].clone())
+ .null_bit_buffer(Some(null_buf))
+ .add_child_data(values.into_data())
+ .build()
+ .unwrap();
+
+ let array = match dict_key {
+ DataType::Int8 => {
+ Arc::new(Int8DictionaryArray::from(dict_data)) as ArrayRef
+ }
+ DataType::Int16 =>
Arc::new(Int16DictionaryArray::from(dict_data)),
+ DataType::Int32 =>
Arc::new(Int32DictionaryArray::from(dict_data)),
+ DataType::Int64 =>
Arc::new(Int64DictionaryArray::from(dict_data)),
+ DataType::UInt8 =>
Arc::new(UInt8DictionaryArray::from(dict_data)),
+ DataType::UInt16 =>
Arc::new(UInt16DictionaryArray::from(dict_data)),
+ DataType::UInt32 =>
Arc::new(UInt32DictionaryArray::from(dict_data)),
+ DataType::UInt64 =>
Arc::new(UInt64DictionaryArray::from(dict_data)),
+ _ => unreachable!(),
+ };
+ Ok(array)
+ }
+ _ => Err(ArrowError::JsonError(format!(
+ "Dictionary key type {:?} not supported",
+ dict_key
+ ))),
}
+}
+/// A helper to create a null buffer from a Vec<bool>
+fn create_null_buf(json_col: &ArrowJsonColumn) -> Buffer {
+ let num_bytes = bit_util::ceil(json_col.count, 8);
+ let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes,
false);
+ json_col
+ .validity
+ .clone()
+ .unwrap()
+ .iter()
+ .enumerate()
+ .for_each(|(i, v)| {
+ let null_slice = null_buf.as_slice_mut();
+ if *v != 0 {
+ bit_util::set_bit(null_slice, i);
+ }
+ });
+ null_buf.into()
+}
+
+impl ArrowJsonBatch {
pub fn from_batch(batch: &RecordBatch) -> ArrowJsonBatch {
let mut json_batch = ArrowJsonBatch {
count: batch.num_rows(),
@@ -496,217 +1037,6 @@ impl ArrowJsonBatch {
}
}
-/// Convert an Arrow JSON column/array into a vector of `Value`
-fn json_from_col(col: &ArrowJsonColumn, data_type: &DataType) -> Vec<Value> {
- match data_type {
- DataType::List(field) => json_from_list_col(col, field.data_type()),
- DataType::FixedSizeList(field, list_size) => {
- json_from_fixed_size_list_col(col, field.data_type(), *list_size
as usize)
- }
- DataType::Struct(fields) => json_from_struct_col(col, fields),
- DataType::Map(field, keys_sorted) => json_from_map_col(col, field,
*keys_sorted),
- DataType::Int64
- | DataType::UInt64
- | DataType::Date64
- | DataType::Time64(_)
- | DataType::Timestamp(_, _)
- | DataType::Duration(_) => {
- // convert int64 data from strings to numbers
- let converted_col: Vec<Value> = col
- .data
- .clone()
- .unwrap()
- .iter()
- .map(|v| {
- Value::Number(match v {
- Value::Number(number) => number.clone(),
- Value::String(string) => VNumber::from(
- string
- .parse::<i64>()
- .expect("Unable to parse string as i64"),
- ),
- t => panic!("Cannot convert {} to number", t),
- })
- })
- .collect();
- merge_json_array(
- col.validity.as_ref().unwrap().as_slice(),
- converted_col.as_slice(),
- )
- }
- DataType::Null => vec![],
- _ => merge_json_array(
- col.validity.as_ref().unwrap().as_slice(),
- &col.data.clone().unwrap(),
- ),
- }
-}
-
-/// Merge VALIDITY and DATA vectors from a primitive data type into a `Value`
vector with nulls
-fn merge_json_array(validity: &[u8], data: &[Value]) -> Vec<Value> {
- validity
- .iter()
- .zip(data)
- .map(|(v, d)| match v {
- 0 => Value::Null,
- 1 => d.clone(),
- _ => panic!("Validity data should be 0 or 1"),
- })
- .collect()
-}
-
-/// Convert an Arrow JSON column/array of a `DataType::Struct` into a vector
of `Value`
-fn json_from_struct_col(col: &ArrowJsonColumn, fields: &[Field]) -> Vec<Value>
{
- let mut values = Vec::with_capacity(col.count);
-
- let children: Vec<Vec<Value>> = col
- .children
- .clone()
- .unwrap()
- .iter()
- .zip(fields)
- .map(|(child, field)| json_from_col(child, field.data_type()))
- .collect();
-
- // create a struct from children
- for j in 0..col.count {
- let mut map = serde_json::map::Map::new();
- for i in 0..children.len() {
- map.insert(fields[i].name().to_string(), children[i][j].clone());
- }
- values.push(Value::Object(map));
- }
-
- values
-}
-
-/// Convert an Arrow JSON column/array of a `DataType::List` into a vector of
`Value`
-fn json_from_list_col(col: &ArrowJsonColumn, data_type: &DataType) ->
Vec<Value> {
- let mut values = Vec::with_capacity(col.count);
-
- // get the inner array
- let child = &col.children.clone().expect("list type must have
children")[0];
- let offsets: Vec<usize> = col
- .offset
- .clone()
- .unwrap()
- .iter()
- .map(|o| match o {
- Value::String(s) => s.parse::<usize>().unwrap(),
- Value::Number(n) => n.as_u64().unwrap() as usize,
- _ => panic!(
- "Offsets should be numbers or strings that are convertible to
numbers"
- ),
- })
- .collect();
- let inner = match data_type {
- DataType::List(ref field) => json_from_col(child, field.data_type()),
- DataType::Struct(fields) => json_from_struct_col(col, fields),
- _ => merge_json_array(
- child.validity.as_ref().unwrap().as_slice(),
- &child.data.clone().unwrap(),
- ),
- };
-
- for i in 0..col.count {
- match &col.validity {
- Some(validity) => match &validity[i] {
- 0 => values.push(Value::Null),
- 1 => {
- values.push(Value::Array(inner[offsets[i]..offsets[i +
1]].to_vec()))
- }
- _ => panic!("Validity data should be 0 or 1"),
- },
- None => {
- // Null type does not have a validity vector
- }
- }
- }
-
- values
-}
-
-/// Convert an Arrow JSON column/array of a `DataType::List` into a vector of
`Value`
-fn json_from_fixed_size_list_col(
- col: &ArrowJsonColumn,
- data_type: &DataType,
- list_size: usize,
-) -> Vec<Value> {
- let mut values = Vec::with_capacity(col.count);
-
- // get the inner array
- let child = &col.children.clone().expect("list type must have
children")[0];
- let inner = match data_type {
- DataType::List(ref field) => json_from_col(child, field.data_type()),
- DataType::FixedSizeList(ref field, _) => json_from_col(child,
field.data_type()),
- DataType::Struct(fields) => json_from_struct_col(col, fields),
- _ => merge_json_array(
- child.validity.as_ref().unwrap().as_slice(),
- &child.data.clone().unwrap(),
- ),
- };
-
- for i in 0..col.count {
- match &col.validity {
- Some(validity) => match &validity[i] {
- 0 => values.push(Value::Null),
- 1 => values.push(Value::Array(
- inner[(list_size * i)..(list_size * (i + 1))].to_vec(),
- )),
- _ => panic!("Validity data should be 0 or 1"),
- },
- None => {}
- }
- }
-
- values
-}
-
-fn json_from_map_col(
- col: &ArrowJsonColumn,
- field: &Field,
- _keys_sorted: bool,
-) -> Vec<Value> {
- let mut values = Vec::with_capacity(col.count);
-
- // get the inner array
- let child = &col.children.clone().expect("list type must have
children")[0];
- let offsets: Vec<usize> = col
- .offset
- .clone()
- .unwrap()
- .iter()
- .map(|o| match o {
- Value::String(s) => s.parse::<usize>().unwrap(),
- Value::Number(n) => n.as_u64().unwrap() as usize,
- _ => panic!(
- "Offsets should be numbers or strings that are convertible to
numbers"
- ),
- })
- .collect();
-
- let inner = match field.data_type() {
- DataType::Struct(fields) => json_from_struct_col(child, fields),
- _ => panic!("Map child must be Struct"),
- };
-
- for i in 0..col.count {
- match &col.validity {
- Some(validity) => match &validity[i] {
- 0 => values.push(Value::Null),
- 1 => {
- values.push(Value::Array(inner[offsets[i]..offsets[i +
1]].to_vec()))
- }
- _ => panic!("Validity data should be 0 or 1"),
- },
- None => {
- // Null type does not have a validity vector
- }
- }
- }
-
- values
-}
#[cfg(test)]
mod tests {
use super::*;
@@ -945,22 +1275,25 @@ mod tests {
.len(3)
.add_buffer(value_offsets)
.add_child_data(value_data.into_data())
+ .null_bit_buffer(Some(Buffer::from([0b00000011])))
.build()
.unwrap();
let lists = ListArray::from(list_data);
let structs_int32s = Int32Array::from(vec![None, Some(-2), None]);
let structs_utf8s = StringArray::from(vec![None, None,
Some("aaaaaa")]);
- let structs = StructArray::from(vec![
- (
- Field::new("int32s", DataType::Int32, true),
- Arc::new(structs_int32s) as ArrayRef,
- ),
- (
- Field::new("utf8s", DataType::Utf8, true),
- Arc::new(structs_utf8s) as ArrayRef,
- ),
+ let struct_data_type = DataType::Struct(vec![
+ Field::new("int32s", DataType::Int32, true),
+ Field::new("utf8s", DataType::Utf8, true),
]);
+ let struct_data = ArrayData::builder(struct_data_type)
+ .len(3)
+ .add_child_data(structs_int32s.data().clone())
+ .add_child_data(structs_utf8s.data().clone())
+ .null_bit_buffer(Some(Buffer::from([0b00000011])))
+ .build()
+ .unwrap();
+ let structs = StructArray::from(struct_data);
let record_batch = RecordBatch::try_new(
Arc::new(schema.clone()),
@@ -1005,6 +1338,6 @@ mod tests {
// test schemas
assert!(arrow_json.schema.equals_schema(&schema));
// test record batch
- assert!(arrow_json.batches[0].equals_batch(&record_batch));
+ assert_eq!(arrow_json.get_record_batches().unwrap()[0], record_batch);
}
}
diff --git a/integration-testing/src/bin/arrow-json-integration-test.rs
b/integration-testing/src/bin/arrow-json-integration-test.rs
index 69b73b19f..b442e8b5e 100644
--- a/integration-testing/src/bin/arrow-json-integration-test.rs
+++ b/integration-testing/src/bin/arrow-json-integration-test.rs
@@ -91,7 +91,10 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose:
bool) -> Result<()>
for f in reader.schema().fields() {
fields.push(ArrowJsonField::from(f));
}
- let schema = ArrowJsonSchema { fields };
+ let schema = ArrowJsonSchema {
+ fields,
+ metadata: None,
+ };
let batches = reader
.map(|batch| Ok(ArrowJsonBatch::from_batch(&batch?)))
diff --git a/integration-testing/src/lib.rs b/integration-testing/src/lib.rs
index 7be70bfa2..5d3da15d3 100644
--- a/integration-testing/src/lib.rs
+++ b/integration-testing/src/lib.rs
@@ -17,29 +17,17 @@
//! Common code used in the integration test binaries
-use hex::decode;
use serde_json::Value;
use arrow::util::integration_util::ArrowJsonBatch;
-use arrow::array::*;
-use arrow::datatypes::{DataType, Field, IntervalUnit, Schema};
-use arrow::error::{ArrowError, Result};
+use arrow::datatypes::Schema;
+use arrow::error::Result;
use arrow::record_batch::RecordBatch;
-use arrow::{
- buffer::Buffer,
- buffer::MutableBuffer,
- datatypes::ToByteSlice,
- util::{bit_util, integration_util::*},
-};
-
-use arrow::util::decimal::{BasicDecimal, Decimal256};
-use num::bigint::BigInt;
-use num::Signed;
+use arrow::util::integration_util::*;
use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
-use std::sync::Arc;
/// The expected username for the basic auth integration test.
pub const AUTH_USERNAME: &str = "arrow";
@@ -88,717 +76,3 @@ pub fn read_json_file(json_name: &str) -> Result<ArrowFile>
{
batches,
})
}
-
-fn record_batch_from_json(
- schema: &Schema,
- json_batch: ArrowJsonBatch,
- json_dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
-) -> Result<RecordBatch> {
- let mut columns = vec![];
-
- for (field, json_col) in schema.fields().iter().zip(json_batch.columns) {
- let col = array_from_json(field, json_col, json_dictionaries)?;
- columns.push(col);
- }
-
- RecordBatch::try_new(Arc::new(schema.clone()), columns)
-}
-
-/// Construct an Arrow array from a partially typed JSON column
-fn array_from_json(
- field: &Field,
- json_col: ArrowJsonColumn,
- dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
-) -> Result<ArrayRef> {
- match field.data_type() {
- DataType::Null => Ok(Arc::new(NullArray::new(json_col.count))),
- DataType::Boolean => {
- let mut b = BooleanBuilder::new(json_col.count);
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => b.append_value(value.as_bool().unwrap()),
- _ => b.append_null(),
- };
- }
- Ok(Arc::new(b.finish()))
- }
- DataType::Int8 => {
- let mut b = Int8Builder::new(json_col.count);
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => b.append_value(value.as_i64().ok_or_else(|| {
- ArrowError::JsonError(format!(
- "Unable to get {:?} as int64",
- value
- ))
- })? as i8),
- _ => b.append_null(),
- };
- }
- Ok(Arc::new(b.finish()))
- }
- DataType::Int16 => {
- let mut b = Int16Builder::new(json_col.count);
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => b.append_value(value.as_i64().unwrap() as i16),
- _ => b.append_null(),
- };
- }
- Ok(Arc::new(b.finish()))
- }
- DataType::Int32
- | DataType::Date32
- | DataType::Time32(_)
- | DataType::Interval(IntervalUnit::YearMonth) => {
- let mut b = Int32Builder::new(json_col.count);
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => b.append_value(value.as_i64().unwrap() as i32),
- _ => b.append_null(),
- };
- }
- let array = Arc::new(b.finish()) as ArrayRef;
- arrow::compute::cast(&array, field.data_type())
- }
- DataType::Int64
- | DataType::Date64
- | DataType::Time64(_)
- | DataType::Timestamp(_, _)
- | DataType::Duration(_)
- | DataType::Interval(IntervalUnit::DayTime) => {
- let mut b = Int64Builder::new(json_col.count);
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => b.append_value(match value {
- Value::Number(n) => n.as_i64().unwrap(),
- Value::String(s) => {
- s.parse().expect("Unable to parse string as i64")
- }
- Value::Object(ref map)
- if map.contains_key("days")
- && map.contains_key("milliseconds") =>
- {
- match field.data_type() {
- DataType::Interval(IntervalUnit::DayTime) => {
- let days = map.get("days").unwrap();
- let milliseconds =
map.get("milliseconds").unwrap();
-
- match (days, milliseconds) {
- (Value::Number(d), Value::Number(m))
=> {
- let mut bytes = [0_u8; 8];
- let m = (m.as_i64().unwrap() as
i32)
- .to_le_bytes();
- let d = (d.as_i64().unwrap() as
i32)
- .to_le_bytes();
-
- let c = [d, m].concat();
-
bytes.copy_from_slice(c.as_slice());
- i64::from_le_bytes(bytes)
- }
- _ => panic!(
- "Unable to parse {:?} as interval
daytime",
- value
- ),
- }
- }
- _ => panic!(
- "Unable to parse {:?} as interval daytime",
- value
- ),
- }
- }
- _ => panic!("Unable to parse {:?} as number", value),
- }),
- _ => b.append_null(),
- };
- }
- let array = Arc::new(b.finish()) as ArrayRef;
- arrow::compute::cast(&array, field.data_type())
- }
- DataType::UInt8 => {
- let mut b = UInt8Builder::new(json_col.count);
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => b.append_value(value.as_u64().unwrap() as u8),
- _ => b.append_null(),
- };
- }
- Ok(Arc::new(b.finish()))
- }
- DataType::UInt16 => {
- let mut b = UInt16Builder::new(json_col.count);
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => b.append_value(value.as_u64().unwrap() as u16),
- _ => b.append_null(),
- };
- }
- Ok(Arc::new(b.finish()))
- }
- DataType::UInt32 => {
- let mut b = UInt32Builder::new(json_col.count);
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => b.append_value(value.as_u64().unwrap() as u32),
- _ => b.append_null(),
- };
- }
- Ok(Arc::new(b.finish()))
- }
- DataType::UInt64 => {
- let mut b = UInt64Builder::new(json_col.count);
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => b.append_value(
- value
- .as_str()
- .unwrap()
- .parse()
- .expect("Unable to parse string as u64"),
- ),
- _ => b.append_null(),
- };
- }
- Ok(Arc::new(b.finish()))
- }
- DataType::Interval(IntervalUnit::MonthDayNano) => {
- let mut b = IntervalMonthDayNanoBuilder::new(json_col.count);
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => b.append_value(match value {
- Value::Object(v) => {
- let months = v.get("months").unwrap();
- let days = v.get("days").unwrap();
- let nanoseconds = v.get("nanoseconds").unwrap();
- match (months, days, nanoseconds) {
- (
- Value::Number(months),
- Value::Number(days),
- Value::Number(nanoseconds),
- ) => {
- let months = months.as_i64().unwrap() as
i32;
- let days = days.as_i64().unwrap() as i32;
- let nanoseconds =
nanoseconds.as_i64().unwrap();
- let months_days_ns: i128 = ((nanoseconds
as i128)
- & 0xFFFFFFFFFFFFFFFF)
- << 64
- | ((days as i128) & 0xFFFFFFFF) << 32
- | ((months as i128) & 0xFFFFFFFF);
- months_days_ns
- }
- (_, _, _) => {
- panic!("Unable to parse {:?} as
MonthDayNano", v)
- }
- }
- }
- _ => panic!("Unable to parse {:?} as MonthDayNano",
value),
- }),
- _ => b.append_null(),
- };
- }
- Ok(Arc::new(b.finish()))
- }
- DataType::Float32 => {
- let mut b = Float32Builder::new(json_col.count);
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => b.append_value(value.as_f64().unwrap() as f32),
- _ => b.append_null(),
- };
- }
- Ok(Arc::new(b.finish()))
- }
- DataType::Float64 => {
- let mut b = Float64Builder::new(json_col.count);
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => b.append_value(value.as_f64().unwrap()),
- _ => b.append_null(),
- };
- }
- Ok(Arc::new(b.finish()))
- }
- DataType::Binary => {
- let mut b = BinaryBuilder::new(json_col.count);
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => {
- let v = decode(value.as_str().unwrap()).unwrap();
- b.append_value(&v)
- }
- _ => b.append_null(),
- };
- }
- Ok(Arc::new(b.finish()))
- }
- DataType::LargeBinary => {
- let mut b = LargeBinaryBuilder::new(json_col.count);
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => {
- let v = decode(value.as_str().unwrap()).unwrap();
- b.append_value(&v)
- }
- _ => b.append_null(),
- };
- }
- Ok(Arc::new(b.finish()))
- }
- DataType::Utf8 => {
- let mut b = StringBuilder::new(json_col.count);
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => b.append_value(value.as_str().unwrap()),
- _ => b.append_null(),
- };
- }
- Ok(Arc::new(b.finish()))
- }
- DataType::LargeUtf8 => {
- let mut b = LargeStringBuilder::new(json_col.count);
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => b.append_value(value.as_str().unwrap()),
- _ => b.append_null(),
- };
- }
- Ok(Arc::new(b.finish()))
- }
- DataType::FixedSizeBinary(len) => {
- let mut b = FixedSizeBinaryBuilder::new(json_col.count, *len);
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => {
- let v = hex::decode(value.as_str().unwrap()).unwrap();
- b.append_value(&v)?
- }
- _ => b.append_null(),
- };
- }
- Ok(Arc::new(b.finish()))
- }
- DataType::List(child_field) => {
- let null_buf = create_null_buf(&json_col);
- let children = json_col.children.clone().unwrap();
- let child_array = array_from_json(
- child_field,
- children.get(0).unwrap().clone(),
- dictionaries,
- )?;
- let offsets: Vec<i32> = json_col
- .offset
- .unwrap()
- .iter()
- .map(|v| v.as_i64().unwrap() as i32)
- .collect();
- let list_data = ArrayData::builder(field.data_type().clone())
- .len(json_col.count)
- .offset(0)
- .add_buffer(Buffer::from(&offsets.to_byte_slice()))
- .add_child_data(child_array.into_data())
- .null_bit_buffer(Some(null_buf))
- .build()
- .unwrap();
- Ok(Arc::new(ListArray::from(list_data)))
- }
- DataType::LargeList(child_field) => {
- let null_buf = create_null_buf(&json_col);
- let children = json_col.children.clone().unwrap();
- let child_array = array_from_json(
- child_field,
- children.get(0).unwrap().clone(),
- dictionaries,
- )?;
- let offsets: Vec<i64> = json_col
- .offset
- .unwrap()
- .iter()
- .map(|v| match v {
- Value::Number(n) => n.as_i64().unwrap(),
- Value::String(s) => s.parse::<i64>().unwrap(),
- _ => panic!("64-bit offset must be either string or
number"),
- })
- .collect();
- let list_data = ArrayData::builder(field.data_type().clone())
- .len(json_col.count)
- .offset(0)
- .add_buffer(Buffer::from(&offsets.to_byte_slice()))
- .add_child_data(child_array.into_data())
- .null_bit_buffer(Some(null_buf))
- .build()
- .unwrap();
- Ok(Arc::new(LargeListArray::from(list_data)))
- }
- DataType::FixedSizeList(child_field, _) => {
- let children = json_col.children.clone().unwrap();
- let child_array = array_from_json(
- child_field,
- children.get(0).unwrap().clone(),
- dictionaries,
- )?;
- let null_buf = create_null_buf(&json_col);
- let list_data = ArrayData::builder(field.data_type().clone())
- .len(json_col.count)
- .add_child_data(child_array.into_data())
- .null_bit_buffer(Some(null_buf))
- .build()
- .unwrap();
- Ok(Arc::new(FixedSizeListArray::from(list_data)))
- }
- DataType::Struct(fields) => {
- // construct struct with null data
- let null_buf = create_null_buf(&json_col);
- let mut array_data = ArrayData::builder(field.data_type().clone())
- .len(json_col.count)
- .null_bit_buffer(Some(null_buf));
-
- for (field, col) in fields.iter().zip(json_col.children.unwrap()) {
- let array = array_from_json(field, col, dictionaries)?;
- array_data = array_data.add_child_data(array.into_data());
- }
-
- let array = StructArray::from(array_data.build().unwrap());
- Ok(Arc::new(array))
- }
- DataType::Dictionary(key_type, value_type) => {
- let dict_id = field.dict_id().ok_or_else(|| {
- ArrowError::JsonError(format!(
- "Unable to find dict_id for field {:?}",
- field
- ))
- })?;
- // find dictionary
- let dictionary = dictionaries
- .ok_or_else(|| {
- ArrowError::JsonError(format!(
- "Unable to find any dictionaries for field {:?}",
- field
- ))
- })?
- .get(&dict_id);
- match dictionary {
- Some(dictionary) => dictionary_array_from_json(
- field,
- json_col,
- key_type,
- value_type,
- dictionary,
- dictionaries,
- ),
- None => Err(ArrowError::JsonError(format!(
- "Unable to find dictionary for field {:?}",
- field
- ))),
- }
- }
- DataType::Decimal128(precision, scale) => {
- let mut b = Decimal128Builder::new(json_col.count, *precision,
*scale);
- // C++ interop tests involve incompatible decimal values
- unsafe {
- b.disable_value_validation();
- }
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => {
-
b.append_value(value.as_str().unwrap().parse::<i128>().unwrap())?
- }
- _ => b.append_null(),
- };
- }
- Ok(Arc::new(b.finish()))
- }
- DataType::Decimal256(precision, scale) => {
- let mut b = Decimal256Builder::new(json_col.count, *precision,
*scale);
- // C++ interop tests involve incompatible decimal values
- unsafe {
- b.disable_value_validation();
- }
- for (is_valid, value) in json_col
- .validity
- .as_ref()
- .unwrap()
- .iter()
- .zip(json_col.data.unwrap())
- {
- match is_valid {
- 1 => {
- let str = value.as_str().unwrap();
- let integer = BigInt::parse_bytes(str.as_bytes(),
10).unwrap();
- let integer_bytes = integer.to_signed_bytes_le();
- let mut bytes = if integer.is_positive() {
- [0_u8; 32]
- } else {
- [255_u8; 32]
- };
- bytes[0..integer_bytes.len()]
- .copy_from_slice(integer_bytes.as_slice());
- let decimal =
- Decimal256::try_new_from_bytes(*precision, *scale,
&bytes)
- .unwrap();
- b.append_value(&decimal)?;
- }
- _ => b.append_null(),
- }
- }
- Ok(Arc::new(b.finish()))
- }
- DataType::Map(child_field, _) => {
- let null_buf = create_null_buf(&json_col);
- let children = json_col.children.clone().unwrap();
- let child_array = array_from_json(
- child_field,
- children.get(0).unwrap().clone(),
- dictionaries,
- )?;
- let offsets: Vec<i32> = json_col
- .offset
- .unwrap()
- .iter()
- .map(|v| v.as_i64().unwrap() as i32)
- .collect();
- let array_data = ArrayData::builder(field.data_type().clone())
- .len(json_col.count)
- .add_buffer(Buffer::from(&offsets.to_byte_slice()))
- .add_child_data(child_array.into_data())
- .null_bit_buffer(Some(null_buf))
- .build()
- .unwrap();
-
- let array = MapArray::from(array_data);
- Ok(Arc::new(array))
- }
- DataType::Union(fields, field_type_ids, _) => {
- let type_ids = if let Some(type_id) = json_col.type_id {
- type_id
- } else {
- return Err(ArrowError::JsonError(
- "Cannot find expected type_id in json column".to_string(),
- ));
- };
-
- let offset: Option<Buffer> = json_col.offset.map(|offsets| {
- let offsets: Vec<i32> =
- offsets.iter().map(|v| v.as_i64().unwrap() as
i32).collect();
- Buffer::from(&offsets.to_byte_slice())
- });
-
- let mut children: Vec<(Field, Arc<dyn Array>)> = vec![];
- for (field, col) in fields.iter().zip(json_col.children.unwrap()) {
- let array = array_from_json(field, col, dictionaries)?;
- children.push((field.clone(), array));
- }
-
- let array = UnionArray::try_new(
- field_type_ids,
- Buffer::from(&type_ids.to_byte_slice()),
- offset,
- children,
- )
- .unwrap();
- Ok(Arc::new(array))
- }
- t => Err(ArrowError::JsonError(format!(
- "data type {:?} not supported",
- t
- ))),
- }
-}
-
-fn dictionary_array_from_json(
- field: &Field,
- json_col: ArrowJsonColumn,
- dict_key: &DataType,
- dict_value: &DataType,
- dictionary: &ArrowJsonDictionaryBatch,
- dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
-) -> Result<ArrayRef> {
- match dict_key {
- DataType::Int8
- | DataType::Int16
- | DataType::Int32
- | DataType::Int64
- | DataType::UInt8
- | DataType::UInt16
- | DataType::UInt32
- | DataType::UInt64 => {
- let null_buf = create_null_buf(&json_col);
-
- // build the key data into a buffer, then construct values
separately
- let key_field = Field::new_dict(
- "key",
- dict_key.clone(),
- field.is_nullable(),
- field
- .dict_id()
- .expect("Dictionary fields must have a dict_id value"),
- field
- .dict_is_ordered()
- .expect("Dictionary fields must have a dict_is_ordered
value"),
- );
- let keys = array_from_json(&key_field, json_col, None)?;
- // note: not enough info on nullability of dictionary
- let value_field = Field::new("value", dict_value.clone(), true);
- let values = array_from_json(
- &value_field,
- dictionary.data.columns[0].clone(),
- dictionaries,
- )?;
-
- // convert key and value to dictionary data
- let dict_data = ArrayData::builder(field.data_type().clone())
- .len(keys.len())
- .add_buffer(keys.data().buffers()[0].clone())
- .null_bit_buffer(Some(null_buf))
- .add_child_data(values.into_data())
- .build()
- .unwrap();
-
- let array = match dict_key {
- DataType::Int8 => {
- Arc::new(Int8DictionaryArray::from(dict_data)) as ArrayRef
- }
- DataType::Int16 =>
Arc::new(Int16DictionaryArray::from(dict_data)),
- DataType::Int32 =>
Arc::new(Int32DictionaryArray::from(dict_data)),
- DataType::Int64 =>
Arc::new(Int64DictionaryArray::from(dict_data)),
- DataType::UInt8 =>
Arc::new(UInt8DictionaryArray::from(dict_data)),
- DataType::UInt16 =>
Arc::new(UInt16DictionaryArray::from(dict_data)),
- DataType::UInt32 =>
Arc::new(UInt32DictionaryArray::from(dict_data)),
- DataType::UInt64 =>
Arc::new(UInt64DictionaryArray::from(dict_data)),
- _ => unreachable!(),
- };
- Ok(array)
- }
- _ => Err(ArrowError::JsonError(format!(
- "Dictionary key type {:?} not supported",
- dict_key
- ))),
- }
-}
-
-/// A helper to create a null buffer from a Vec<bool>
-fn create_null_buf(json_col: &ArrowJsonColumn) -> Buffer {
- let num_bytes = bit_util::ceil(json_col.count, 8);
- let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes,
false);
- json_col
- .validity
- .clone()
- .unwrap()
- .iter()
- .enumerate()
- .for_each(|(i, v)| {
- let null_slice = null_buf.as_slice_mut();
- if *v != 0 {
- bit_util::set_bit(null_slice, i);
- }
- });
- null_buf.into()
-}
diff --git a/parquet/src/arrow/arrow_reader.rs
b/parquet/src/arrow/arrow_reader.rs
index 3cd5cb9d4..67bd2a619 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -392,8 +392,6 @@ mod tests {
use std::sync::Arc;
use rand::{thread_rng, RngCore};
- use serde_json::json;
- use serde_json::Value::{Array as JArray, Null as JNull, Object as JObject};
use tempfile::tempfile;
use arrow::array::*;
@@ -427,50 +425,31 @@ mod tests {
#[test]
fn test_arrow_reader_all_columns() {
- let json_values =
get_json_array("parquet/generated_simple_numerics/blogs.json");
-
let parquet_file_reader =
get_test_reader("parquet/generated_simple_numerics/blogs.parquet");
- let max_len =
parquet_file_reader.metadata().file_metadata().num_rows() as usize;
-
let mut arrow_reader =
ParquetFileArrowReader::new(parquet_file_reader);
- let mut record_batch_reader = arrow_reader
+ let record_batch_reader = arrow_reader
.get_record_reader(60)
.expect("Failed to read into array!");
// Verify that the schema was correctly parsed
let original_schema =
arrow_reader.get_schema().unwrap().fields().clone();
assert_eq!(original_schema, *record_batch_reader.schema().fields());
-
- compare_batch_json(&mut record_batch_reader, json_values, max_len);
}
#[test]
fn test_arrow_reader_single_column() {
- let json_values =
get_json_array("parquet/generated_simple_numerics/blogs.json");
-
- let projected_json_values = json_values
- .into_iter()
- .map(|value| match value {
- JObject(fields) => {
- json!({ "blog_id":
fields.get("blog_id").unwrap_or(&JNull).clone()})
- }
- _ => panic!("Input should be json object array!"),
- })
- .collect::<Vec<_>>();
-
let parquet_file_reader =
get_test_reader("parquet/generated_simple_numerics/blogs.parquet");
let file_metadata = parquet_file_reader.metadata().file_metadata();
- let max_len = file_metadata.num_rows() as usize;
let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [2]);
let mut arrow_reader =
ParquetFileArrowReader::new(parquet_file_reader);
- let mut record_batch_reader = arrow_reader
+ let record_batch_reader = arrow_reader
.get_record_reader_by_columns(mask, 60)
.expect("Failed to read into array!");
@@ -478,8 +457,6 @@ mod tests {
let original_schema =
arrow_reader.get_schema().unwrap().fields().clone();
assert_eq!(1, record_batch_reader.schema().fields().len());
assert_eq!(original_schema[1],
record_batch_reader.schema().fields()[0]);
-
- compare_batch_json(&mut record_batch_reader, projected_json_values,
max_len);
}
#[test]
@@ -1257,39 +1234,6 @@ mod tests {
File::open(path.as_path()).expect("File not found!")
}
- fn get_json_array(filename: &str) -> Vec<serde_json::Value> {
- match serde_json::from_reader(get_test_file(filename))
- .expect("Failed to read json value from file!")
- {
- JArray(values) => values,
- _ => panic!("Input should be json array!"),
- }
- }
-
- fn compare_batch_json(
- record_batch_reader: &mut dyn RecordBatchReader,
- json_values: Vec<serde_json::Value>,
- max_len: usize,
- ) {
- for i in 0..20 {
- let array: Option<StructArray> = record_batch_reader
- .next()
- .map(|r| r.expect("Failed to read record batch!").into());
-
- let (start, end) = (i * 60_usize, (i + 1) * 60_usize);
-
- if start < max_len {
- assert!(array.is_some());
- assert_ne!(0, array.as_ref().unwrap().len());
- let end = min(end, max_len);
- let json = JArray(Vec::from(&json_values[start..end]));
- assert_eq!(array.unwrap(), json)
- } else {
- assert!(array.is_none());
- }
- }
- }
-
#[test]
fn test_read_structs() {
// This particular test file has columns of struct types where there is