This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bf8bfbab Upgrade to arrow 15 (#2631)
9bf8bfbab is described below

commit 9bf8bfbab47823effcbc78f98a675be64221f7cf
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue May 31 13:13:50 2022 +0100

    Upgrade to arrow 15 (#2631)
    
    * Prepare for arrow 15 release
    
    * Update to arrow 15.0.0
    
    * Update ballista pin
---
 Cargo.toml                                               |  1 -
 datafusion-cli/Cargo.toml                                |  2 +-
 datafusion-examples/Cargo.toml                           |  2 +-
 datafusion/common/Cargo.toml                             |  4 ++--
 datafusion/common/src/scalar.rs                          |  2 +-
 datafusion/core/Cargo.toml                               |  4 ++--
 datafusion/core/fuzz-utils/Cargo.toml                    |  2 +-
 datafusion/core/src/avro_to_arrow/arrow_array_reader.rs  |  9 ++++-----
 datafusion/core/src/avro_to_arrow/schema.rs              |  5 +++--
 datafusion/core/src/physical_plan/common.rs              |  2 +-
 datafusion/core/src/physical_plan/file_format/parquet.rs | 13 +++++++++----
 datafusion/expr/Cargo.toml                               |  2 +-
 datafusion/jit/Cargo.toml                                |  2 +-
 datafusion/physical-expr/Cargo.toml                      |  2 +-
 datafusion/physical-expr/src/expressions/case.rs         |  4 ++--
 datafusion/proto/Cargo.toml                              |  2 +-
 datafusion/proto/proto/datafusion.proto                  |  1 +
 datafusion/proto/src/from_proto.rs                       | 11 +++++++++--
 datafusion/proto/src/lib.rs                              |  4 ++++
 datafusion/proto/src/to_proto.rs                         | 10 ++++------
 datafusion/row/Cargo.toml                                |  2 +-
 datafusion/sql/Cargo.toml                                |  2 +-
 dev/build-arrow-ballista.sh                              |  2 +-
 23 files changed, 52 insertions(+), 38 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index db2cc46e9..1cc7aa6eb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -34,4 +34,3 @@ exclude = ["datafusion-cli"]
 [profile.release]
 codegen-units = 1
 lto = true
-
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index 0c210bc0e..d2d2ba5e4 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -29,7 +29,7 @@ rust-version = "1.59"
 readme = "README.md"
 
 [dependencies]
-arrow = { version = "14.0.0" }
+arrow = { version = "15.0.0" }
 clap = { version = "3", features = ["derive", "cargo"] }
 datafusion = { path = "../datafusion/core", version = "8.0.0" }
 dirs = "4.0.0"
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index 963efdf0d..399248f1d 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -34,7 +34,7 @@ path = "examples/avro_sql.rs"
 required-features = ["datafusion/avro"]
 
 [dev-dependencies]
-arrow-flight = { version = "14.0.0" }
+arrow-flight = { version = "15.0.0" }
 async-trait = "0.1.41"
 datafusion = { path = "../datafusion/core" }
 futures = "0.3"
diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml
index a8f1032fd..bf52bc26d 100644
--- a/datafusion/common/Cargo.toml
+++ b/datafusion/common/Cargo.toml
@@ -38,10 +38,10 @@ jit = ["cranelift-module"]
 pyarrow = ["pyo3"]
 
 [dependencies]
-arrow = { version = "14.0.0", features = ["prettyprint"] }
+arrow = { version = "15.0.0", features = ["prettyprint"] }
 avro-rs = { version = "0.13", features = ["snappy"], optional = true }
 cranelift-module = { version = "0.84.0", optional = true }
 ordered-float = "3.0"
-parquet = { version = "14.0.0", features = ["arrow"], optional = true }
+parquet = { version = "15.0.0", features = ["arrow"], optional = true }
 pyo3 = { version = "0.16", optional = true }
 sqlparser = "0.17"
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 758cc18bb..775f2ef88 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -1059,7 +1059,7 @@ impl ScalarValue {
         let offsets_array = offsets.finish();
         let array_data = ArrayDataBuilder::new(data_type.clone())
             .len(offsets_array.len() - 1)
-            .null_bit_buffer(valid.finish())
+            .null_bit_buffer(Some(valid.finish()))
             .add_buffer(offsets_array.data().buffers()[0].clone())
             .add_child_data(flat_array.data().clone());
 
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 08da53f9c..79a48cf15 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -55,7 +55,7 @@ unicode_expressions = 
["datafusion-physical-expr/regex_expressions", "datafusion
 
 [dependencies]
 ahash = { version = "0.7", default-features = false }
-arrow = { version = "14.0.0", features = ["prettyprint"] }
+arrow = { version = "15.0.0", features = ["prettyprint"] }
 async-trait = "0.1.41"
 avro-rs = { version = "0.13", features = ["snappy"], optional = true }
 chrono = { version = "0.4", default-features = false }
@@ -76,7 +76,7 @@ num-traits = { version = "0.2", optional = true }
 num_cpus = "1.13.0"
 ordered-float = "3.0"
 parking_lot = "0.12"
-parquet = { version = "14.0.0", features = ["arrow"] }
+parquet = { version = "15.0.0", features = ["arrow"] }
 paste = "^1.0"
 pin-project-lite = "^0.2.7"
 pyo3 = { version = "0.16", optional = true }
diff --git a/datafusion/core/fuzz-utils/Cargo.toml 
b/datafusion/core/fuzz-utils/Cargo.toml
index 4089ad74e..aae7c108a 100644
--- a/datafusion/core/fuzz-utils/Cargo.toml
+++ b/datafusion/core/fuzz-utils/Cargo.toml
@@ -23,6 +23,6 @@ edition = "2021"
 # See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-arrow = { version = "14.0.0", features = ["prettyprint"] }
+arrow = { version = "15.0.0", features = ["prettyprint"] }
 env_logger = "0.9.0"
 rand = "0.8"
diff --git a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs 
b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
index dfaa98555..663dffa15 100644
--- a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
+++ b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
@@ -484,7 +484,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                 ArrayData::builder(list_field.data_type().clone())
                     .len(valid_len)
                     .add_buffer(bool_values.into())
-                    .null_bit_buffer(bool_nulls.into())
+                    .null_bit_buffer(Some(bool_nulls.into()))
                     .build()
                     .unwrap()
             }
@@ -567,10 +567,9 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                 let arrays =
                     self.build_struct_array(rows.as_slice(), 
fields.as_slice(), &[])?;
                 let data_type = DataType::Struct(fields.clone());
-                let buf = null_buffer.into();
                 ArrayDataBuilder::new(data_type)
                     .len(rows.len())
-                    .null_bit_buffer(buf)
+                    .null_bit_buffer(Some(null_buffer.into()))
                     .child_data(arrays.into_iter().map(|a| 
a.data().clone()).collect())
                     .build()
                     .unwrap()
@@ -587,7 +586,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
             .len(list_len)
             .add_buffer(Buffer::from_slice_ref(&offsets))
             .add_child_data(array_data)
-            .null_bit_buffer(list_nulls.into())
+            .null_bit_buffer(Some(list_nulls.into()))
             .build()
             .unwrap();
         Ok(Arc::new(GenericListArray::<OffsetSize>::from(list_data)))
@@ -778,7 +777,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                         let data_type = DataType::Struct(fields.clone());
                         let data = ArrayDataBuilder::new(data_type)
                             .len(len)
-                            .null_bit_buffer(null_buffer.into())
+                            .null_bit_buffer(Some(null_buffer.into()))
                             .child_data(
                                 arrays.into_iter().map(|a| 
a.data().clone()).collect(),
                             )
diff --git a/datafusion/core/src/avro_to_arrow/schema.rs 
b/datafusion/core/src/avro_to_arrow/schema.rs
index 2e9a17de3..dd8b90776 100644
--- a/datafusion/core/src/avro_to_arrow/schema.rs
+++ b/datafusion/core/src/avro_to_arrow/schema.rs
@@ -103,7 +103,8 @@ fn schema_to_field_with_props(
                     .iter()
                     .map(|s| schema_to_field_with_props(s, None, has_nullable, 
None))
                     .collect::<Result<Vec<Field>>>()?;
-                DataType::Union(fields, UnionMode::Dense)
+                let type_ids = (0_i8..fields.len() as i8).collect();
+                DataType::Union(fields, type_ids, UnionMode::Dense)
             }
         }
         AvroSchema::Record { name, fields, .. } => {
@@ -212,7 +213,7 @@ fn default_field_name(dt: &DataType) -> &str {
         DataType::FixedSizeList(_, _) => "fixed_size_list",
         DataType::LargeList(_) => "largelist",
         DataType::Struct(_) => "struct",
-        DataType::Union(_, _) => "union",
+        DataType::Union(_, _, _) => "union",
         DataType::Dictionary(_, _) => "map",
         DataType::Map(_, _) => unimplemented!("Map support not implemented"),
         DataType::Decimal(_, _) => "decimal",
diff --git a/datafusion/core/src/physical_plan/common.rs 
b/datafusion/core/src/physical_plan/common.rs
index 24df647dc..e5230a5c1 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -365,7 +365,7 @@ mod tests {
         let expected = Statistics {
             is_exact: true,
             num_rows: Some(3),
-            total_byte_size: Some(416), // this might change a bit if the way 
we compute the size changes
+            total_byte_size: Some(464), // this might change a bit if the way 
we compute the size changes
             column_statistics: Some(vec![
                 ColumnStatistics {
                     distinct_count: None,
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs 
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 9eda036a4..81c4dd427 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -37,8 +37,9 @@ use futures::{Stream, StreamExt, TryStreamExt};
 use log::debug;
 use parquet::arrow::{
     arrow_reader::ParquetRecordBatchReader, ArrowReader, ArrowWriter,
-    ParquetFileArrowReader,
+    ParquetFileArrowReader, ProjectionMask,
 };
+use parquet::file::reader::FileReader;
 use parquet::file::{
     metadata::RowGroupMetaData, properties::WriterProperties,
     reader::SerializedFileReader, serialized_reader::ReadOptionsBuilder,
@@ -352,14 +353,18 @@ impl ParquetExecStream {
             opt.build(),
         )?;
 
+        let file_metadata = file_reader.metadata().file_metadata();
+        let parquet_schema = file_metadata.schema_descr_ptr();
+
         let mut arrow_reader = 
ParquetFileArrowReader::new(Arc::new(file_reader));
+        let arrow_schema = arrow_reader.get_schema()?;
 
         let adapted_projections = self
             .adapter
-            .map_projections(&arrow_reader.get_schema()?, &self.projection)?;
+            .map_projections(&arrow_schema, &self.projection)?;
 
-        let reader = arrow_reader
-            .get_record_reader_by_columns(adapted_projections, 
self.batch_size)?;
+        let mask = ProjectionMask::roots(&parquet_schema, adapted_projections);
+        let reader = arrow_reader.get_record_reader_by_columns(mask, 
self.batch_size)?;
 
         Ok(reader)
     }
diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml
index 22c73e6a9..8252013ef 100644
--- a/datafusion/expr/Cargo.toml
+++ b/datafusion/expr/Cargo.toml
@@ -36,6 +36,6 @@ path = "src/lib.rs"
 
 [dependencies]
 ahash = { version = "0.7", default-features = false }
-arrow = { version = "14.0.0", features = ["prettyprint"] }
+arrow = { version = "15.0.0", features = ["prettyprint"] }
 datafusion-common = { path = "../common", version = "8.0.0" }
 sqlparser = "0.17"
diff --git a/datafusion/jit/Cargo.toml b/datafusion/jit/Cargo.toml
index a8ca329bf..ba45a799e 100644
--- a/datafusion/jit/Cargo.toml
+++ b/datafusion/jit/Cargo.toml
@@ -36,7 +36,7 @@ path = "src/lib.rs"
 jit = []
 
 [dependencies]
-arrow = { version = "14.0.0" }
+arrow = { version = "15.0.0" }
 cranelift = "0.84.0"
 cranelift-jit = "0.84.0"
 cranelift-module = "0.84.0"
diff --git a/datafusion/physical-expr/Cargo.toml 
b/datafusion/physical-expr/Cargo.toml
index c6c286f0b..f4226ba33 100644
--- a/datafusion/physical-expr/Cargo.toml
+++ b/datafusion/physical-expr/Cargo.toml
@@ -40,7 +40,7 @@ unicode_expressions = ["unicode-segmentation"]
 
 [dependencies]
 ahash = { version = "0.7", default-features = false }
-arrow = { version = "14.0.0", features = ["prettyprint"] }
+arrow = { version = "15.0.0", features = ["prettyprint"] }
 blake2 = { version = "^0.10.2", optional = true }
 blake3 = { version = "1.0", optional = true }
 chrono = { version = "0.4", default-features = false }
diff --git a/datafusion/physical-expr/src/expressions/case.rs 
b/datafusion/physical-expr/src/expressions/case.rs
index e7db10d17..d2a0fac99 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -594,10 +594,10 @@ mod tests {
         .collect();
 
         //let valid_array = vec![true, false, false, true, false, tru
-        let null_buffer = Buffer::from_slice_ref(&[0b00101001u8]);
+        let null_buffer = Buffer::from([0b00101001u8]);
         let load4 = ArrayDataBuilder::new(load4.data_type().clone())
             .len(load4.len())
-            .null_bit_buffer(null_buffer)
+            .null_bit_buffer(Some(null_buffer))
             .buffers(load4.data().buffers().to_vec())
             .build()
             .unwrap();
diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml
index e773bd55f..f0df2d407 100644
--- a/datafusion/proto/Cargo.toml
+++ b/datafusion/proto/Cargo.toml
@@ -35,7 +35,7 @@ path = "src/lib.rs"
 [features]
 
 [dependencies]
-arrow = { version = "14.0.0" }
+arrow = { version = "15.0.0" }
 datafusion = { path = "../core", version = "8.0.0" }
 datafusion-common = { path = "../common", version = "8.0.0" }
 datafusion-expr = { path = "../expr", version = "8.0.0" }
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index a990e175d..d57871442 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -616,6 +616,7 @@ enum UnionMode{
 message Union{
     repeated Field union_types = 1;
     UnionMode union_mode = 2;
+    repeated int32 type_ids = 3;
 }
 
 message ScalarListValue{
diff --git a/datafusion/proto/src/from_proto.rs 
b/datafusion/proto/src/from_proto.rs
index fd72db4ed..6717e5b47 100644
--- a/datafusion/proto/src/from_proto.rs
+++ b/datafusion/proto/src/from_proto.rs
@@ -338,9 +338,16 @@ impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for 
DataType {
                 let union_types = union
                     .union_types
                     .iter()
-                    .map(|field| field.try_into())
+                    .map(TryInto::try_into)
                     .collect::<Result<Vec<_>, _>>()?;
-                DataType::Union(union_types, union_mode)
+
+                // Default to index based type ids if not provided
+                let type_ids = match union.type_ids.is_empty() {
+                    true => (0..union_types.len() as i8).collect(),
+                    false => union.type_ids.iter().map(|i| *i as i8).collect(),
+                };
+
+                DataType::Union(union_types, type_ids, union_mode)
             }
             arrow_type::ArrowTypeEnum::Dictionary(dict) => {
                 let key_datatype = 
dict.as_ref().key.as_deref().required("key")?;
diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs
index 24ac70346..6fe1aac68 100644
--- a/datafusion/proto/src/lib.rs
+++ b/datafusion/proto/src/lib.rs
@@ -568,6 +568,7 @@ mod roundtrip_tests {
                     Field::new("name", DataType::Utf8, false),
                     Field::new("datatype", DataType::Binary, false),
                 ],
+                vec![0, 2, 3],
                 UnionMode::Dense,
             ),
             DataType::Union(
@@ -585,6 +586,7 @@ mod roundtrip_tests {
                         true,
                     ),
                 ],
+                vec![1, 2, 3],
                 UnionMode::Sparse,
             ),
             DataType::Dictionary(
@@ -720,6 +722,7 @@ mod roundtrip_tests {
                     Field::new("name", DataType::Utf8, false),
                     Field::new("datatype", DataType::Binary, false),
                 ],
+                vec![7, 5, 3],
                 UnionMode::Sparse,
             ),
             DataType::Union(
@@ -737,6 +740,7 @@ mod roundtrip_tests {
                         true,
                     ),
                 ],
+                vec![5, 8, 1],
                 UnionMode::Dense,
             ),
             DataType::Dictionary(
diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs
index 7aa4278b3..70c4d57c4 100644
--- a/datafusion/proto/src/to_proto.rs
+++ b/datafusion/proto/src/to_proto.rs
@@ -200,17 +200,15 @@ impl From<&DataType> for 
protobuf::arrow_type::ArrowTypeEnum {
                     .map(|field| field.into())
                     .collect::<Vec<_>>(),
             }),
-            DataType::Union(union_types, union_mode) => {
+            DataType::Union(union_types, type_ids, union_mode) => {
                 let union_mode = match union_mode {
                     UnionMode::Sparse => protobuf::UnionMode::Sparse,
                     UnionMode::Dense => protobuf::UnionMode::Dense,
                 };
                 Self::Union(protobuf::Union {
-                    union_types: union_types
-                        .iter()
-                        .map(|field| field.into())
-                        .collect::<Vec<_>>(),
+                    union_types: union_types.iter().map(Into::into).collect(),
                     union_mode: union_mode.into(),
+                    type_ids: type_ids.iter().map(|x| *x as i32).collect(),
                 })
             }
             DataType::Dictionary(key_type, value_type) => {
@@ -1188,7 +1186,7 @@ impl TryFrom<&DataType> for 
protobuf::scalar_type::Datatype {
             | DataType::FixedSizeList(_, _)
             | DataType::LargeList(_)
             | DataType::Struct(_)
-            | DataType::Union(_, _)
+            | DataType::Union(_, _, _)
             | DataType::Dictionary(_, _)
             | DataType::Map(_, _)
             | DataType::Decimal(_, _) => {
diff --git a/datafusion/row/Cargo.toml b/datafusion/row/Cargo.toml
index f9a150699..bb755ecea 100644
--- a/datafusion/row/Cargo.toml
+++ b/datafusion/row/Cargo.toml
@@ -37,7 +37,7 @@ path = "src/lib.rs"
 jit = ["datafusion-jit"]
 
 [dependencies]
-arrow = { version = "14.0.0" }
+arrow = { version = "15.0.0" }
 datafusion-common = { path = "../common", version = "8.0.0" }
 datafusion-jit = { path = "../jit", version = "8.0.0", optional = true }
 paste = "^1.0"
diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml
index 673823c37..cd2172a65 100644
--- a/datafusion/sql/Cargo.toml
+++ b/datafusion/sql/Cargo.toml
@@ -38,7 +38,7 @@ unicode_expressions = []
 
 [dependencies]
 ahash = { version = "0.7", default-features = false }
-arrow = { version = "14.0.0", features = ["prettyprint"] }
+arrow = { version = "15.0.0", features = ["prettyprint"] }
 datafusion-common = { path = "../common", version = "8.0.0" }
 datafusion-expr = { path = "../expr", version = "8.0.0" }
 hashbrown = "0.12"
diff --git a/dev/build-arrow-ballista.sh b/dev/build-arrow-ballista.sh
index 8c2e687f3..12b6e9bc0 100755
--- a/dev/build-arrow-ballista.sh
+++ b/dev/build-arrow-ballista.sh
@@ -24,7 +24,7 @@ rm -rf arrow-ballista 2>/dev/null
 
 # clone the repo
 # TODO make repo/branch configurable
-git clone https://github.com/tustvold/arrow-ballista -b url-refactor
+git clone https://github.com/tustvold/arrow-ballista -b arrow-15
 
 # update dependencies to local crates
 python ./dev/make-ballista-deps-local.py

Reply via email to