This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 57d7777  Update DataFusion to arrow 6.0 (#984)
57d7777 is described below

commit 57d7777fc0ce94e783a7f447631624c354b0b906
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Oct 19 05:58:18 2021 -0400

    Update DataFusion to arrow 6.0 (#984)
    
    * Update to arrow 6.0
    
    * Update empty result
    
    * Fix up sort_preserving_merge test
    
    * Use column c12, per @yjshen
    
    * Add minimal ballista support
    
    * Update to use new arrow apis
    
    * Fixup sort preserving merge test again
    
    * make sort deterministic
    
    * less whitespace
    
    * patch up avro
---
 ballista-examples/Cargo.toml                         |  2 +-
 ballista/rust/core/Cargo.toml                        |  2 +-
 .../rust/core/src/serde/logical_plan/to_proto.rs     |  4 ++++
 ballista/rust/executor/Cargo.toml                    |  4 ++--
 datafusion-cli/Cargo.toml                            |  2 +-
 datafusion-examples/Cargo.toml                       |  2 +-
 datafusion/Cargo.toml                                |  4 ++--
 datafusion/src/avro_to_arrow/arrow_array_reader.rs   |  8 ++++++--
 datafusion/src/avro_to_arrow/schema.rs               |  1 +
 datafusion/src/execution/context.rs                  |  7 ++++++-
 datafusion/src/physical_plan/expressions/in_list.rs  | 20 +++++++++++---------
 datafusion/src/physical_plan/hash_join.rs            |  6 ++++--
 .../src/physical_plan/sort_preserving_merge.rs       | 18 +++++++++++++++---
 13 files changed, 55 insertions(+), 25 deletions(-)

diff --git a/ballista-examples/Cargo.toml b/ballista-examples/Cargo.toml
index e0989b9..55a5916 100644
--- a/ballista-examples/Cargo.toml
+++ b/ballista-examples/Cargo.toml
@@ -28,7 +28,7 @@ edition = "2018"
 publish = false
 
 [dependencies]
-arrow-flight = { version = "^5.3" }
+arrow-flight = { version = "6.0.0" }
 datafusion = { path = "../datafusion" }
 ballista = { path = "../ballista/rust/client" }
 prost = "0.8"
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index b3a4e42..4518e77 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -43,7 +43,7 @@ tonic = "0.5"
 uuid = { version = "0.8", features = ["v4"] }
 chrono = "0.4"
 
-arrow-flight = { version = "^5.3"  }
+arrow-flight = { version = "6.0.0"  }
 
 datafusion = { path = "../../../datafusion", version = "5.1.0" }
 
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index ba7daca..e79e654 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -360,6 +360,9 @@ impl From<&DataType> for 
protobuf::arrow_type::ArrowTypeEnum {
                     fractional: *fractional as u64,
                 })
             }
+            DataType::Map(_, _) => {
+                unimplemented!("Ballista does not yet support Map data type")
+            }
         }
     }
 }
@@ -490,6 +493,7 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype 
{
             | DataType::Struct(_)
             | DataType::Union(_)
             | DataType::Dictionary(_, _)
+            | DataType::Map(_, _)
             | DataType::Decimal(_, _) => {
                 return Err(proto_error(format!(
                     "Error converting to Datatype to scalar type, {:?} is 
invalid as a datafusion scalar.",
diff --git a/ballista/rust/executor/Cargo.toml 
b/ballista/rust/executor/Cargo.toml
index c6e0ab8..231b05f 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -29,8 +29,8 @@ edition = "2018"
 snmalloc = ["snmalloc-rs"]
 
 [dependencies]
-arrow = { version = "^5.3"  }
-arrow-flight = { version = "^5.3"  }
+arrow = { version = "6.0.0"  }
+arrow-flight = { version = "6.0.0"  }
 anyhow = "1"
 async-trait = "0.1.36"
 ballista-core = { path = "../core", version = "0.6.0" }
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index 3b9be67..22196ca 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -31,5 +31,5 @@ clap = "2.33"
 rustyline = "8.0"
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", 
"sync"] }
 datafusion = { path = "../datafusion", version = "5.1.0" }
-arrow = { version = "^5.3"  }
+arrow = { version = "6.0.0"  }
 ballista = { path = "../ballista/rust/client", version = "0.6.0" }
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index 113cd5b..9f151f0 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -33,7 +33,7 @@ path = "examples/avro_sql.rs"
 required-features = ["datafusion/avro"]
 
 [dev-dependencies]
-arrow-flight = { version = "^5.3" }
+arrow-flight = { version = "6.0.0" }
 datafusion = { path = "../datafusion" }
 prost = "0.8"
 tonic = "0.5"
diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index 4fa6461..6e8b521 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -50,8 +50,8 @@ avro = ["avro-rs", "num-traits"]
 [dependencies]
 ahash = "0.7"
 hashbrown = { version = "0.11", features = ["raw"] }
-arrow = { version = "^5.3", features = ["prettyprint"] }
-parquet = { version = "^5.3", features = ["arrow"] }
+arrow = { version = "6.0.0", features = ["prettyprint"] }
+parquet = { version = "6.0.0", features = ["arrow"] }
 sqlparser = "0.12"
 paste = "^1.0"
 num_cpus = "1.13.0"
diff --git a/datafusion/src/avro_to_arrow/arrow_array_reader.rs 
b/datafusion/src/avro_to_arrow/arrow_array_reader.rs
index cc8ed8e..bb863d6 100644
--- a/datafusion/src/avro_to_arrow/arrow_array_reader.rs
+++ b/datafusion/src/avro_to_arrow/arrow_array_reader.rs
@@ -484,6 +484,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                     .add_buffer(bool_values.into())
                     .null_bit_buffer(bool_nulls.into())
                     .build()
+                    .unwrap()
             }
             DataType::Int8 => 
self.read_primitive_list_values::<Int8Type>(rows),
             DataType::Int16 => 
self.read_primitive_list_values::<Int16Type>(rows),
@@ -569,6 +570,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                     .null_bit_buffer(buf)
                     .child_data(arrays.into_iter().map(|a| 
a.data().clone()).collect())
                     .build()
+                    .unwrap()
             }
             datatype => {
                 return Err(ArrowError::SchemaError(format!(
@@ -583,7 +585,8 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
             .add_buffer(Buffer::from_slice_ref(&offsets))
             .add_child_data(array_data)
             .null_bit_buffer(list_nulls.into())
-            .build();
+            .build()
+            .unwrap();
         Ok(Arc::new(GenericListArray::<OffsetSize>::from(list_data)))
     }
 
@@ -776,7 +779,8 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                             .child_data(
                                 arrays.into_iter().map(|a| 
a.data().clone()).collect(),
                             )
-                            .build();
+                            .build()
+                            .unwrap();
                         Ok(make_array(data))
                     }
                     _ => Err(ArrowError::SchemaError(format!(
diff --git a/datafusion/src/avro_to_arrow/schema.rs 
b/datafusion/src/avro_to_arrow/schema.rs
index c2927f0..c6eda80 100644
--- a/datafusion/src/avro_to_arrow/schema.rs
+++ b/datafusion/src/avro_to_arrow/schema.rs
@@ -213,6 +213,7 @@ fn default_field_name(dt: &DataType) -> &str {
         DataType::Struct(_) => "struct",
         DataType::Union(_) => "union",
         DataType::Dictionary(_, _) => "map",
+        DataType::Map(_, _) => unimplemented!("Map support not implemented"),
         DataType::Decimal(_, _) => "decimal",
     }
 }
diff --git a/datafusion/src/execution/context.rs 
b/datafusion/src/execution/context.rs
index 23667f5..627f7ca 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -1970,7 +1970,12 @@ mod tests {
         let results =
             execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 
4).await?;
 
-        let expected = vec!["++", "||", "++", "++"];
+        let expected = vec![
+            "+----+--------------+",
+            "| c1 | AVG(test.c2) |",
+            "+----+--------------+",
+            "+----+--------------+",
+        ];
         assert_batches_sorted_eq!(expected, &results);
 
         Ok(())
diff --git a/datafusion/src/physical_plan/expressions/in_list.rs 
b/datafusion/src/physical_plan/expressions/in_list.rs
index 00767c7..826ffa8 100644
--- a/datafusion/src/physical_plan/expressions/in_list.rs
+++ b/datafusion/src/physical_plan/expressions/in_list.rs
@@ -47,15 +47,17 @@ macro_rules! compare_op_scalar {
         // same as $left.len()
         let buffer = unsafe { 
MutableBuffer::from_trusted_len_iter_bool(comparison) };
 
-        let data = ArrayData::new(
-            DataType::Boolean,
-            $left.len(),
-            None,
-            null_bit_buffer,
-            0,
-            vec![Buffer::from(buffer)],
-            vec![],
-        );
+        let data = unsafe {
+            ArrayData::new_unchecked(
+                DataType::Boolean,
+                $left.len(),
+                None,
+                null_bit_buffer,
+                0,
+                vec![Buffer::from(buffer)],
+                vec![],
+            )
+        };
         Ok(BooleanArray::from(data))
     }};
 }
diff --git a/datafusion/src/physical_plan/hash_join.rs 
b/datafusion/src/physical_plan/hash_join.rs
index 2a14093..2ed0faa 100644
--- a/datafusion/src/physical_plan/hash_join.rs
+++ b/datafusion/src/physical_plan/hash_join.rs
@@ -678,11 +678,13 @@ fn build_join_indexes(
             let left = ArrayData::builder(DataType::UInt64)
                 .len(left_indices.len())
                 .add_buffer(left_indices.finish())
-                .build();
+                .build()
+                .unwrap();
             let right = ArrayData::builder(DataType::UInt32)
                 .len(right_indices.len())
                 .add_buffer(right_indices.finish())
-                .build();
+                .build()
+                .unwrap();
 
             Ok((
                 PrimitiveArray::<UInt64Type>::from(left),
diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs 
b/datafusion/src/physical_plan/sort_preserving_merge.rs
index 0b75f46..5aaf978 100644
--- a/datafusion/src/physical_plan/sort_preserving_merge.rs
+++ b/datafusion/src/physical_plan/sort_preserving_merge.rs
@@ -963,6 +963,10 @@ mod tests {
                 expr: col("c7", &schema).unwrap(),
                 options: SortOptions::default(),
             },
+            PhysicalSortExpr {
+                expr: col("c12", &schema).unwrap(),
+                options: SortOptions::default(),
+            },
         ];
 
         let basic = basic_sort(csv.clone(), sort.clone()).await;
@@ -971,7 +975,11 @@ mod tests {
         let basic = 
arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
         let partition = 
arrow::util::pretty::pretty_format_batches(&[partition]).unwrap();
 
-        assert_eq!(basic, partition);
+        assert_eq!(
+            basic, partition,
+            "basic:\n\n{}\n\npartition:\n\n{}\n\n",
+            basic, partition
+        );
     }
 
     // Split the provided record batch into multiple batch_size record batches
@@ -1183,7 +1191,7 @@ mod tests {
     async fn test_async() {
         let schema = test::aggr_test_schema();
         let sort = vec![PhysicalSortExpr {
-            expr: col("c7", &schema).unwrap(),
+            expr: col("c12", &schema).unwrap(),
             options: SortOptions::default(),
         }];
 
@@ -1234,7 +1242,11 @@ mod tests {
         let basic = 
arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
         let partition = 
arrow::util::pretty::pretty_format_batches(&[merged]).unwrap();
 
-        assert_eq!(basic, partition);
+        assert_eq!(
+            basic, partition,
+            "basic:\n\n{}\n\npartition:\n\n{}\n\n",
+            basic, partition
+        );
     }
 
     #[tokio::test]

Reply via email to