This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 9bf8bfbab Upgrade to arrow 15 (#2631)
9bf8bfbab is described below
commit 9bf8bfbab47823effcbc78f98a675be64221f7cf
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue May 31 13:13:50 2022 +0100
Upgrade to arrow 15 (#2631)
* Prepare for arrow 15 release
* Update to arrow 15.0.0
* Update ballista pin
---
Cargo.toml | 1 -
datafusion-cli/Cargo.toml | 2 +-
datafusion-examples/Cargo.toml | 2 +-
datafusion/common/Cargo.toml | 4 ++--
datafusion/common/src/scalar.rs | 2 +-
datafusion/core/Cargo.toml | 4 ++--
datafusion/core/fuzz-utils/Cargo.toml | 2 +-
datafusion/core/src/avro_to_arrow/arrow_array_reader.rs | 9 ++++-----
datafusion/core/src/avro_to_arrow/schema.rs | 5 +++--
datafusion/core/src/physical_plan/common.rs | 2 +-
datafusion/core/src/physical_plan/file_format/parquet.rs | 13 +++++++++----
datafusion/expr/Cargo.toml | 2 +-
datafusion/jit/Cargo.toml | 2 +-
datafusion/physical-expr/Cargo.toml | 2 +-
datafusion/physical-expr/src/expressions/case.rs | 4 ++--
datafusion/proto/Cargo.toml | 2 +-
datafusion/proto/proto/datafusion.proto | 1 +
datafusion/proto/src/from_proto.rs | 11 +++++++++--
datafusion/proto/src/lib.rs | 4 ++++
datafusion/proto/src/to_proto.rs | 10 ++++------
datafusion/row/Cargo.toml | 2 +-
datafusion/sql/Cargo.toml | 2 +-
dev/build-arrow-ballista.sh | 2 +-
23 files changed, 52 insertions(+), 38 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index db2cc46e9..1cc7aa6eb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -34,4 +34,3 @@ exclude = ["datafusion-cli"]
[profile.release]
codegen-units = 1
lto = true
-
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index 0c210bc0e..d2d2ba5e4 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -29,7 +29,7 @@ rust-version = "1.59"
readme = "README.md"
[dependencies]
-arrow = { version = "14.0.0" }
+arrow = { version = "15.0.0" }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "8.0.0" }
dirs = "4.0.0"
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index 963efdf0d..399248f1d 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -34,7 +34,7 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]
[dev-dependencies]
-arrow-flight = { version = "14.0.0" }
+arrow-flight = { version = "15.0.0" }
async-trait = "0.1.41"
datafusion = { path = "../datafusion/core" }
futures = "0.3"
diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml
index a8f1032fd..bf52bc26d 100644
--- a/datafusion/common/Cargo.toml
+++ b/datafusion/common/Cargo.toml
@@ -38,10 +38,10 @@ jit = ["cranelift-module"]
pyarrow = ["pyo3"]
[dependencies]
-arrow = { version = "14.0.0", features = ["prettyprint"] }
+arrow = { version = "15.0.0", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.84.0", optional = true }
ordered-float = "3.0"
-parquet = { version = "14.0.0", features = ["arrow"], optional = true }
+parquet = { version = "15.0.0", features = ["arrow"], optional = true }
pyo3 = { version = "0.16", optional = true }
sqlparser = "0.17"
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 758cc18bb..775f2ef88 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -1059,7 +1059,7 @@ impl ScalarValue {
let offsets_array = offsets.finish();
let array_data = ArrayDataBuilder::new(data_type.clone())
.len(offsets_array.len() - 1)
- .null_bit_buffer(valid.finish())
+ .null_bit_buffer(Some(valid.finish()))
.add_buffer(offsets_array.data().buffers()[0].clone())
.add_child_data(flat_array.data().clone());
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 08da53f9c..79a48cf15 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -55,7 +55,7 @@ unicode_expressions =
["datafusion-physical-expr/regex_expressions", "datafusion
[dependencies]
ahash = { version = "0.7", default-features = false }
-arrow = { version = "14.0.0", features = ["prettyprint"] }
+arrow = { version = "15.0.0", features = ["prettyprint"] }
async-trait = "0.1.41"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
chrono = { version = "0.4", default-features = false }
@@ -76,7 +76,7 @@ num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
ordered-float = "3.0"
parking_lot = "0.12"
-parquet = { version = "14.0.0", features = ["arrow"] }
+parquet = { version = "15.0.0", features = ["arrow"] }
paste = "^1.0"
pin-project-lite = "^0.2.7"
pyo3 = { version = "0.16", optional = true }
diff --git a/datafusion/core/fuzz-utils/Cargo.toml
b/datafusion/core/fuzz-utils/Cargo.toml
index 4089ad74e..aae7c108a 100644
--- a/datafusion/core/fuzz-utils/Cargo.toml
+++ b/datafusion/core/fuzz-utils/Cargo.toml
@@ -23,6 +23,6 @@ edition = "2021"
# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-arrow = { version = "14.0.0", features = ["prettyprint"] }
+arrow = { version = "15.0.0", features = ["prettyprint"] }
env_logger = "0.9.0"
rand = "0.8"
diff --git a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
index dfaa98555..663dffa15 100644
--- a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
+++ b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
@@ -484,7 +484,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
ArrayData::builder(list_field.data_type().clone())
.len(valid_len)
.add_buffer(bool_values.into())
- .null_bit_buffer(bool_nulls.into())
+ .null_bit_buffer(Some(bool_nulls.into()))
.build()
.unwrap()
}
@@ -567,10 +567,9 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
let arrays =
self.build_struct_array(rows.as_slice(),
fields.as_slice(), &[])?;
let data_type = DataType::Struct(fields.clone());
- let buf = null_buffer.into();
ArrayDataBuilder::new(data_type)
.len(rows.len())
- .null_bit_buffer(buf)
+ .null_bit_buffer(Some(null_buffer.into()))
.child_data(arrays.into_iter().map(|a|
a.data().clone()).collect())
.build()
.unwrap()
@@ -587,7 +586,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
.len(list_len)
.add_buffer(Buffer::from_slice_ref(&offsets))
.add_child_data(array_data)
- .null_bit_buffer(list_nulls.into())
+ .null_bit_buffer(Some(list_nulls.into()))
.build()
.unwrap();
Ok(Arc::new(GenericListArray::<OffsetSize>::from(list_data)))
@@ -778,7 +777,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
let data_type = DataType::Struct(fields.clone());
let data = ArrayDataBuilder::new(data_type)
.len(len)
- .null_bit_buffer(null_buffer.into())
+ .null_bit_buffer(Some(null_buffer.into()))
.child_data(
arrays.into_iter().map(|a|
a.data().clone()).collect(),
)
diff --git a/datafusion/core/src/avro_to_arrow/schema.rs
b/datafusion/core/src/avro_to_arrow/schema.rs
index 2e9a17de3..dd8b90776 100644
--- a/datafusion/core/src/avro_to_arrow/schema.rs
+++ b/datafusion/core/src/avro_to_arrow/schema.rs
@@ -103,7 +103,8 @@ fn schema_to_field_with_props(
.iter()
.map(|s| schema_to_field_with_props(s, None, has_nullable,
None))
.collect::<Result<Vec<Field>>>()?;
- DataType::Union(fields, UnionMode::Dense)
+ let type_ids = (0_i8..fields.len() as i8).collect();
+ DataType::Union(fields, type_ids, UnionMode::Dense)
}
}
AvroSchema::Record { name, fields, .. } => {
@@ -212,7 +213,7 @@ fn default_field_name(dt: &DataType) -> &str {
DataType::FixedSizeList(_, _) => "fixed_size_list",
DataType::LargeList(_) => "largelist",
DataType::Struct(_) => "struct",
- DataType::Union(_, _) => "union",
+ DataType::Union(_, _, _) => "union",
DataType::Dictionary(_, _) => "map",
DataType::Map(_, _) => unimplemented!("Map support not implemented"),
DataType::Decimal(_, _) => "decimal",
diff --git a/datafusion/core/src/physical_plan/common.rs
b/datafusion/core/src/physical_plan/common.rs
index 24df647dc..e5230a5c1 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -365,7 +365,7 @@ mod tests {
let expected = Statistics {
is_exact: true,
num_rows: Some(3),
- total_byte_size: Some(416), // this might change a bit if the way
we compute the size changes
+ total_byte_size: Some(464), // this might change a bit if the way
we compute the size changes
column_statistics: Some(vec![
ColumnStatistics {
distinct_count: None,
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 9eda036a4..81c4dd427 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -37,8 +37,9 @@ use futures::{Stream, StreamExt, TryStreamExt};
use log::debug;
use parquet::arrow::{
arrow_reader::ParquetRecordBatchReader, ArrowReader, ArrowWriter,
- ParquetFileArrowReader,
+ ParquetFileArrowReader, ProjectionMask,
};
+use parquet::file::reader::FileReader;
use parquet::file::{
metadata::RowGroupMetaData, properties::WriterProperties,
reader::SerializedFileReader, serialized_reader::ReadOptionsBuilder,
@@ -352,14 +353,18 @@ impl ParquetExecStream {
opt.build(),
)?;
+ let file_metadata = file_reader.metadata().file_metadata();
+ let parquet_schema = file_metadata.schema_descr_ptr();
+
let mut arrow_reader =
ParquetFileArrowReader::new(Arc::new(file_reader));
+ let arrow_schema = arrow_reader.get_schema()?;
let adapted_projections = self
.adapter
- .map_projections(&arrow_reader.get_schema()?, &self.projection)?;
+ .map_projections(&arrow_schema, &self.projection)?;
- let reader = arrow_reader
- .get_record_reader_by_columns(adapted_projections,
self.batch_size)?;
+ let mask = ProjectionMask::roots(&parquet_schema, adapted_projections);
+ let reader = arrow_reader.get_record_reader_by_columns(mask,
self.batch_size)?;
Ok(reader)
}
diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml
index 22c73e6a9..8252013ef 100644
--- a/datafusion/expr/Cargo.toml
+++ b/datafusion/expr/Cargo.toml
@@ -36,6 +36,6 @@ path = "src/lib.rs"
[dependencies]
ahash = { version = "0.7", default-features = false }
-arrow = { version = "14.0.0", features = ["prettyprint"] }
+arrow = { version = "15.0.0", features = ["prettyprint"] }
datafusion-common = { path = "../common", version = "8.0.0" }
sqlparser = "0.17"
diff --git a/datafusion/jit/Cargo.toml b/datafusion/jit/Cargo.toml
index a8ca329bf..ba45a799e 100644
--- a/datafusion/jit/Cargo.toml
+++ b/datafusion/jit/Cargo.toml
@@ -36,7 +36,7 @@ path = "src/lib.rs"
jit = []
[dependencies]
-arrow = { version = "14.0.0" }
+arrow = { version = "15.0.0" }
cranelift = "0.84.0"
cranelift-jit = "0.84.0"
cranelift-module = "0.84.0"
diff --git a/datafusion/physical-expr/Cargo.toml
b/datafusion/physical-expr/Cargo.toml
index c6c286f0b..f4226ba33 100644
--- a/datafusion/physical-expr/Cargo.toml
+++ b/datafusion/physical-expr/Cargo.toml
@@ -40,7 +40,7 @@ unicode_expressions = ["unicode-segmentation"]
[dependencies]
ahash = { version = "0.7", default-features = false }
-arrow = { version = "14.0.0", features = ["prettyprint"] }
+arrow = { version = "15.0.0", features = ["prettyprint"] }
blake2 = { version = "^0.10.2", optional = true }
blake3 = { version = "1.0", optional = true }
chrono = { version = "0.4", default-features = false }
diff --git a/datafusion/physical-expr/src/expressions/case.rs
b/datafusion/physical-expr/src/expressions/case.rs
index e7db10d17..d2a0fac99 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -594,10 +594,10 @@ mod tests {
.collect();
//let valid_array = vec![true, false, false, true, false, tru
- let null_buffer = Buffer::from_slice_ref(&[0b00101001u8]);
+ let null_buffer = Buffer::from([0b00101001u8]);
let load4 = ArrayDataBuilder::new(load4.data_type().clone())
.len(load4.len())
- .null_bit_buffer(null_buffer)
+ .null_bit_buffer(Some(null_buffer))
.buffers(load4.data().buffers().to_vec())
.build()
.unwrap();
diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml
index e773bd55f..f0df2d407 100644
--- a/datafusion/proto/Cargo.toml
+++ b/datafusion/proto/Cargo.toml
@@ -35,7 +35,7 @@ path = "src/lib.rs"
[features]
[dependencies]
-arrow = { version = "14.0.0" }
+arrow = { version = "15.0.0" }
datafusion = { path = "../core", version = "8.0.0" }
datafusion-common = { path = "../common", version = "8.0.0" }
datafusion-expr = { path = "../expr", version = "8.0.0" }
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index a990e175d..d57871442 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -616,6 +616,7 @@ enum UnionMode{
message Union{
repeated Field union_types = 1;
UnionMode union_mode = 2;
+ repeated int32 type_ids = 3;
}
message ScalarListValue{
diff --git a/datafusion/proto/src/from_proto.rs
b/datafusion/proto/src/from_proto.rs
index fd72db4ed..6717e5b47 100644
--- a/datafusion/proto/src/from_proto.rs
+++ b/datafusion/proto/src/from_proto.rs
@@ -338,9 +338,16 @@ impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for
DataType {
let union_types = union
.union_types
.iter()
- .map(|field| field.try_into())
+ .map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()?;
- DataType::Union(union_types, union_mode)
+
+ // Default to index based type ids if not provided
+ let type_ids = match union.type_ids.is_empty() {
+ true => (0..union_types.len() as i8).collect(),
+ false => union.type_ids.iter().map(|i| *i as i8).collect(),
+ };
+
+ DataType::Union(union_types, type_ids, union_mode)
}
arrow_type::ArrowTypeEnum::Dictionary(dict) => {
let key_datatype =
dict.as_ref().key.as_deref().required("key")?;
diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs
index 24ac70346..6fe1aac68 100644
--- a/datafusion/proto/src/lib.rs
+++ b/datafusion/proto/src/lib.rs
@@ -568,6 +568,7 @@ mod roundtrip_tests {
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
],
+ vec![0, 2, 3],
UnionMode::Dense,
),
DataType::Union(
@@ -585,6 +586,7 @@ mod roundtrip_tests {
true,
),
],
+ vec![1, 2, 3],
UnionMode::Sparse,
),
DataType::Dictionary(
@@ -720,6 +722,7 @@ mod roundtrip_tests {
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
],
+ vec![7, 5, 3],
UnionMode::Sparse,
),
DataType::Union(
@@ -737,6 +740,7 @@ mod roundtrip_tests {
true,
),
],
+ vec![5, 8, 1],
UnionMode::Dense,
),
DataType::Dictionary(
diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs
index 7aa4278b3..70c4d57c4 100644
--- a/datafusion/proto/src/to_proto.rs
+++ b/datafusion/proto/src/to_proto.rs
@@ -200,17 +200,15 @@ impl From<&DataType> for
protobuf::arrow_type::ArrowTypeEnum {
.map(|field| field.into())
.collect::<Vec<_>>(),
}),
- DataType::Union(union_types, union_mode) => {
+ DataType::Union(union_types, type_ids, union_mode) => {
let union_mode = match union_mode {
UnionMode::Sparse => protobuf::UnionMode::Sparse,
UnionMode::Dense => protobuf::UnionMode::Dense,
};
Self::Union(protobuf::Union {
- union_types: union_types
- .iter()
- .map(|field| field.into())
- .collect::<Vec<_>>(),
+ union_types: union_types.iter().map(Into::into).collect(),
union_mode: union_mode.into(),
+ type_ids: type_ids.iter().map(|x| *x as i32).collect(),
})
}
DataType::Dictionary(key_type, value_type) => {
@@ -1188,7 +1186,7 @@ impl TryFrom<&DataType> for
protobuf::scalar_type::Datatype {
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_)
| DataType::Struct(_)
- | DataType::Union(_, _)
+ | DataType::Union(_, _, _)
| DataType::Dictionary(_, _)
| DataType::Map(_, _)
| DataType::Decimal(_, _) => {
diff --git a/datafusion/row/Cargo.toml b/datafusion/row/Cargo.toml
index f9a150699..bb755ecea 100644
--- a/datafusion/row/Cargo.toml
+++ b/datafusion/row/Cargo.toml
@@ -37,7 +37,7 @@ path = "src/lib.rs"
jit = ["datafusion-jit"]
[dependencies]
-arrow = { version = "14.0.0" }
+arrow = { version = "15.0.0" }
datafusion-common = { path = "../common", version = "8.0.0" }
datafusion-jit = { path = "../jit", version = "8.0.0", optional = true }
paste = "^1.0"
diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml
index 673823c37..cd2172a65 100644
--- a/datafusion/sql/Cargo.toml
+++ b/datafusion/sql/Cargo.toml
@@ -38,7 +38,7 @@ unicode_expressions = []
[dependencies]
ahash = { version = "0.7", default-features = false }
-arrow = { version = "14.0.0", features = ["prettyprint"] }
+arrow = { version = "15.0.0", features = ["prettyprint"] }
datafusion-common = { path = "../common", version = "8.0.0" }
datafusion-expr = { path = "../expr", version = "8.0.0" }
hashbrown = "0.12"
diff --git a/dev/build-arrow-ballista.sh b/dev/build-arrow-ballista.sh
index 8c2e687f3..12b6e9bc0 100755
--- a/dev/build-arrow-ballista.sh
+++ b/dev/build-arrow-ballista.sh
@@ -24,7 +24,7 @@ rm -rf arrow-ballista 2>/dev/null
# clone the repo
# TODO make repo/branch configurable
-git clone https://github.com/tustvold/arrow-ballista -b url-refactor
+git clone https://github.com/tustvold/arrow-ballista -b arrow-15
# update dependencies to local crates
python ./dev/make-ballista-deps-local.py