This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new abeec01c update latest datafusion. (#175)
abeec01c is described below

commit abeec01cf6040c1f097a77d8d0ea99280b2be68e
Author: Yang Jiang <[email protected]>
AuthorDate: Wed Aug 31 00:20:31 2022 +0800

    update latest datafusion. (#175)
---
 .github/workflows/python_build.yml                 | 11 ++++++
 .github/workflows/python_test.yaml                 | 13 ++++++
 .github/workflows/rust.yml                         | 46 ++++++++++++++++++++++
 ballista-cli/Cargo.toml                            |  4 +-
 ballista/rust/client/Cargo.toml                    |  6 +--
 ballista/rust/core/Cargo.toml                      | 18 ++++-----
 ballista/rust/core/proto/datafusion.proto          |  8 +++-
 .../core/src/execution_plans/shuffle_writer.rs     | 12 +++---
 .../core/src/serde/physical_plan/from_proto.rs     |  5 +--
 ballista/rust/core/src/serde/physical_plan/mod.rs  |  7 +++-
 ballista/rust/core/src/serde/scheduler/mod.rs      | 14 +++----
 ballista/rust/executor/Cargo.toml                  | 10 ++---
 ballista/rust/scheduler/Cargo.toml                 | 14 ++++---
 ballista/rust/scheduler/src/lib.rs                 |  1 +
 ballista/rust/scheduler/src/main.rs                | 20 ++++++----
 benchmarks/Cargo.toml                              |  4 +-
 examples/Cargo.toml                                |  6 +--
 python/Cargo.toml                                  |  2 +-
 python/src/functions.rs                            |  2 +-
 python/src/udaf.rs                                 | 12 ++++--
 20 files changed, 153 insertions(+), 62 deletions(-)

diff --git a/.github/workflows/python_build.yml 
b/.github/workflows/python_build.yml
index 45f3adde..89b101bc 100644
--- a/.github/workflows/python_build.yml
+++ b/.github/workflows/python_build.yml
@@ -94,6 +94,16 @@ jobs:
     steps:
       - uses: actions/checkout@v2
       - run: rm LICENSE.txt
+      - name: Install protobuf compiler
+        shell: bash
+        run: |
+          mkdir -p $HOME/d/protoc
+          cd $HOME/d/protoc
+          export PROTO_ZIP="protoc-21.4-linux-x86_64.zip"
+          curl -LO 
https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
+          unzip $PROTO_ZIP
+          export PATH=$PATH:$HOME/d/protoc/bin
+          protoc --version
       - name: Download LICENSE.txt
         uses: actions/download-artifact@v2
         with:
@@ -102,6 +112,7 @@ jobs:
       - run: cat LICENSE.txt
       - name: Build wheels
         run: |
+          export PATH=$PATH:$HOME/d/protoc/bin
           export RUSTFLAGS='-C target-cpu=skylake'
           docker run --rm -v $(pwd)/..:/io \
             --workdir /io/python \
diff --git a/.github/workflows/python_test.yaml 
b/.github/workflows/python_test.yaml
index f3768e1e..73302da8 100644
--- a/.github/workflows/python_test.yaml
+++ b/.github/workflows/python_test.yaml
@@ -23,6 +23,16 @@ jobs:
     runs-on: ubuntu-latest
     steps:
       - uses: actions/checkout@v2
+      - name: Install protobuf compiler
+        shell: bash
+        run: |
+          mkdir -p $HOME/d/protoc
+          cd $HOME/d/protoc
+          export PROTO_ZIP="protoc-21.4-linux-x86_64.zip"
+          curl -LO 
https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
+          unzip $PROTO_ZIP
+          export PATH=$PATH:$HOME/d/protoc/bin
+          protoc --version
       - name: Setup Rust toolchain
         run: |
           rustup component add rustfmt
@@ -41,6 +51,7 @@ jobs:
           python-version: "3.10"
       - name: Create Virtualenv
         run: |
+          export PATH=$PATH:$HOME/d/protoc/bin
           python -m venv venv
           source venv/bin/activate
           pip install -r python/requirements.txt
@@ -51,6 +62,8 @@ jobs:
           black --line-length 79 --diff --check python
       - name: Run tests
         run: |
+          export PATH=$PATH:$HOME/d/protoc/bin
+          protoc --version
           source venv/bin/activate
           cd python
           maturin develop
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index e8ffab29..b6f9ac73 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -89,6 +89,16 @@ jobs:
       - uses: actions/checkout@v2
         with:
           submodules: true
+      - name: Install protobuf compiler
+        shell: bash
+        run: |
+          mkdir -p $HOME/d/protoc
+          cd $HOME/d/protoc
+          export PROTO_ZIP="protoc-21.4-linux-x86_64.zip"
+          curl -LO 
https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
+          unzip $PROTO_ZIP
+          export PATH=$PATH:$HOME/d/protoc/bin
+          protoc --version
       - name: Cache Cargo
         uses: actions/cache@v2
         with:
@@ -107,6 +117,7 @@ jobs:
           rust-version: ${{ matrix.rust }}
       - name: Run tests
         run: |
+          export PATH=$PATH:$HOME/d/protoc/bin
           export ARROW_TEST_DATA=$(pwd)/testing/data
           export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
           cargo test
@@ -135,6 +146,16 @@ jobs:
       - uses: actions/checkout@v2
         with:
           submodules: true
+      - name: Install protobuf compiler
+        shell: bash
+        run: |
+          mkdir -p $HOME/d/protoc
+          cd $HOME/d/protoc
+          export PROTO_ZIP="protoc-21.4-linux-x86_64.zip"
+          curl -LO 
https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
+          unzip $PROTO_ZIP
+          export PATH=$PATH:$HOME/d/protoc/bin
+          protoc --version
       - name: Cache Cargo
         uses: actions/cache@v2
         with:
@@ -154,6 +175,7 @@ jobs:
       # Ballista is currently not part of the main workspace so requires a 
separate test step
       - name: Run Ballista tests
         run: |
+          export PATH=$PATH:$HOME/d/protoc/bin
           export ARROW_TEST_DATA=$(pwd)/testing/data
           export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
           cd ballista/rust
@@ -176,6 +198,29 @@ jobs:
       - uses: actions/checkout@v2
         with:
           submodules: true
+      - name: Install protobuf macos compiler
+        shell: bash
+        run: |
+          mkdir -p $HOME/d/protoc
+          cd $HOME/d/protoc
+          export PROTO_ZIP="protoc-21.4-osx-x86_64.zip"
+          curl -LO 
https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
+          unzip $PROTO_ZIP
+          echo "$HOME/d/protoc/bin" >> $GITHUB_PATH
+          export PATH=$PATH:$HOME/d/protoc/bin
+          protoc --version
+        if: ${{matrix.os == 'macos-latest'}}
+      - name: Install protobuf windows compiler
+        shell: bash
+        run: |
+          mkdir -p $HOME/d/protoc
+          cd $HOME/d/protoc
+          export PROTO_ZIP="protoc-21.4-win64.zip"
+          curl -LO 
https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
+          unzip $PROTO_ZIP
+          export PATH=$PATH:$HOME/d/protoc/bin
+          protoc.exe --version
+        if: ${{matrix.os == 'windows-latest'}}
       # TODO: this won't cache anything, which is expensive. Setup this action
       # with a OS-dependent path.
       - name: Setup Rust toolchain
@@ -186,6 +231,7 @@ jobs:
       - name: Run tests
         shell: bash
         run: |
+          export PATH=$PATH:$HOME/d/protoc/bin
           export ARROW_TEST_DATA=$(pwd)/testing/data
           export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
           cargo test
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 11c2e772..8fdad9eb 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -31,8 +31,8 @@ readme = "README.md"
 [dependencies]
 ballista = { path = "../ballista/rust/client", version = "0.7.0" }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
-datafusion-cli = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"7aed4d697fa24053d515babfd7678855451c6736" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"7aed4d697fa24053d515babfd7678855451c6736" }
 dirs = "4.0.0"
 env_logger = "0.9"
 mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index 9cf4eb33..ff9165b4 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -31,12 +31,12 @@ rust-version = "1.59"
 ballista-core = { path = "../core", version = "0.7.0" }
 ballista-executor = { path = "../executor", version = "0.7.0", optional = true 
}
 ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional = 
true }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"7aed4d697fa24053d515babfd7678855451c6736" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"7aed4d697fa24053d515babfd7678855451c6736" }
 futures = "0.3"
 log = "0.4"
 parking_lot = "0.12"
-sqlparser = "0.18"
+sqlparser = "0.22"
 tempfile = "3"
 tokio = "1.0"
 
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index 820eb163..c32cac07 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -35,28 +35,28 @@ simd = ["datafusion/simd"]
 [dependencies]
 ahash = { version = "0.8", default-features = false }
 
-arrow-flight = { version = "18.0.0" }
+arrow-flight = { version = "20.0.0" }
 async-trait = "0.1.41"
 chrono = { version = "0.4", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"7aed4d697fa24053d515babfd7678855451c6736" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"7aed4d697fa24053d515babfd7678855451c6736" }
 futures = "0.3"
 hashbrown = "0.12"
 
 libloading = "0.7.3"
 log = "0.4"
-object_store = "0.3.0"
+object_store = "0.4.0"
 once_cell = "1.9.0"
 
 parking_lot = "0.12"
 parse_arg = "0.1.3"
-prost = "0.10"
-prost-types = "0.10"
+prost = "0.11"
+prost-types = "0.11"
 serde = { version = "1", features = ["derive"] }
-sqlparser = "0.18"
+sqlparser = "0.22"
 tokio = "1.0"
-tonic = "0.7"
+tonic = "0.8"
 uuid = { version = "1.0", features = ["v4"] }
 walkdir = "2.3.2"
 
@@ -65,4 +65,4 @@ tempfile = "3"
 
 [build-dependencies]
 rustc_version = "0.4.0"
-tonic-build = { version = "0.7" }
+tonic-build = { version = "0.8", default-features = false, features = 
["transport", "prost"] }
diff --git a/ballista/rust/core/proto/datafusion.proto 
b/ballista/rust/core/proto/datafusion.proto
index 09c97029..10d012e9 100644
--- a/ballista/rust/core/proto/datafusion.proto
+++ b/ballista/rust/core/proto/datafusion.proto
@@ -438,6 +438,10 @@ enum ScalarFunction {
   Coalesce=63;
   Power=64;
   StructFun=65;
+  FromUnixtime=66;
+  Atan2=67;
+  DateBin=68;
+  ArrowTypeof=69;
 }
 
 message ScalarFunctionNode {
@@ -464,11 +468,13 @@ enum AggregateFunction {
   APPROX_MEDIAN=15;
   APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16;
   GROUPING = 17;
+  MEDIAN=18;
 }
 
 message AggregateExprNode {
   AggregateFunction aggr_function = 1;
   repeated LogicalExprNode expr = 2;
+  bool distinct = 3;
 }
 
 message AggregateUDFExprNode {
@@ -654,7 +660,7 @@ message Union{
 }
 
 message ScalarListValue{
-  ScalarType datatype = 1;
+  Field field = 1;
   repeated ScalarValue values = 2;
 }
 
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs 
b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 45a10218..a1302111 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -354,11 +354,11 @@ impl ExecutionPlan for ShuffleWriterExec {
                 let mut num_bytes_builder = UInt64Builder::new(num_writers);
 
                 for loc in &part_loc {
-                    path_builder.append_value(loc.path.clone())?;
-                    partition_builder.append_value(loc.partition_id as u32)?;
-                    num_rows_builder.append_value(loc.num_rows)?;
-                    num_batches_builder.append_value(loc.num_batches)?;
-                    num_bytes_builder.append_value(loc.num_bytes)?;
+                    path_builder.append_value(loc.path.clone());
+                    partition_builder.append_value(loc.partition_id as u32);
+                    num_rows_builder.append_value(loc.num_rows);
+                    num_batches_builder.append_value(loc.num_batches);
+                    num_bytes_builder.append_value(loc.num_bytes);
                 }
 
                 // build arrays
@@ -374,7 +374,7 @@ impl ExecutionPlan for ShuffleWriterExec {
                     field_builders,
                 );
                 for _ in 0..num_writers {
-                    stats_builder.append(true)?;
+                    stats_builder.append(true);
                 }
                 let stats = Arc::new(stats_builder.finish());
 
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 6268ab28..6abb5f12 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -68,9 +68,7 @@ pub(crate) fn parse_physical_expr(
             let pcol: Column = c.into();
             Arc::new(pcol)
         }
-        ExprType::Literal(scalar) => {
-            Arc::new(Literal::new(convert_required!(scalar.value)?))
-        }
+        ExprType::Literal(scalar) => 
Arc::new(Literal::new(scalar.try_into()?)),
         ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
             parse_required_physical_box_expr(
                 &binary_expr.l,
@@ -312,6 +310,7 @@ impl TryFrom<&protobuf::PartitionedFile> for 
PartitionedFile {
                 .map(|v| v.try_into())
                 .collect::<Result<Vec<_>, _>>()?,
             range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
+            extensions: None,
         })
     }
 }
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs 
b/ballista/rust/core/src/serde/physical_plan/mod.rs
index b80e64b1..95627f6d 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -168,6 +168,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                 Ok(Arc::new(ParquetExec::new(
                     decode_scan_config(scan.base_conf.as_ref().unwrap())?,
                     predicate,
+                    None,
                 )))
             }
             PhysicalPlanType::AvroScan(scan) => Ok(Arc::new(AvroExec::new(
@@ -1462,7 +1463,11 @@ mod roundtrip_tests {
         };
 
         let predicate = 
datafusion::prelude::col("col").eq(datafusion::prelude::lit("1"));
-        roundtrip_test(Arc::new(ParquetExec::new(scan_config, 
Some(predicate))))
+        roundtrip_test(Arc::new(ParquetExec::new(
+            scan_config,
+            Some(predicate),
+            None,
+        )))
     }
 
     #[test]
diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs 
b/ballista/rust/core/src/serde/scheduler/mod.rs
index 38b350d6..6f5a61d5 100644
--- a/ballista/rust/core/src/serde/scheduler/mod.rs
+++ b/ballista/rust/core/src/serde/scheduler/mod.rs
@@ -157,28 +157,28 @@ impl PartitionStats {
 
         let mut num_rows_builder = UInt64Builder::new(1);
         match self.num_rows {
-            Some(n) => num_rows_builder.append_value(n)?,
-            None => num_rows_builder.append_null()?,
+            Some(n) => num_rows_builder.append_value(n),
+            None => num_rows_builder.append_null(),
         }
         field_builders.push(Box::new(num_rows_builder) as Box<dyn 
ArrayBuilder>);
 
         let mut num_batches_builder = UInt64Builder::new(1);
         match self.num_batches {
-            Some(n) => num_batches_builder.append_value(n)?,
-            None => num_batches_builder.append_null()?,
+            Some(n) => num_batches_builder.append_value(n),
+            None => num_batches_builder.append_null(),
         }
         field_builders.push(Box::new(num_batches_builder) as Box<dyn 
ArrayBuilder>);
 
         let mut num_bytes_builder = UInt64Builder::new(1);
         match self.num_bytes {
-            Some(n) => num_bytes_builder.append_value(n)?,
-            None => num_bytes_builder.append_null()?,
+            Some(n) => num_bytes_builder.append_value(n),
+            None => num_bytes_builder.append_null(),
         }
         field_builders.push(Box::new(num_bytes_builder) as Box<dyn 
ArrayBuilder>);
 
         let mut struct_builder =
             StructBuilder::new(self.arrow_struct_fields(), field_builders);
-        struct_builder.append(true)?;
+        struct_builder.append(true);
         Ok(Arc::new(struct_builder.finish()))
     }
 
diff --git a/ballista/rust/executor/Cargo.toml 
b/ballista/rust/executor/Cargo.toml
index cd031b20..f35b3543 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -34,14 +34,14 @@ snmalloc = ["snmalloc-rs"]
 
 [dependencies]
 anyhow = "1"
-arrow = { version = "18.0.0" }
-arrow-flight = { version = "18.0.0" }
+arrow = { version = "20.0.0" }
+arrow-flight = { version = "20.0.0" }
 async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.7.0" }
 chrono = { version = "0.4", default-features = false }
 configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"7aed4d697fa24053d515babfd7678855451c6736" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"7aed4d697fa24053d515babfd7678855451c6736" }
 futures = "0.3"
 hyper = "0.14.4"
 log = "0.4"
@@ -50,7 +50,7 @@ snmalloc-rs = { version = "0.3", optional = true }
 tempfile = "3"
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", 
"parking_lot", "signal"] }
 tokio-stream = { version = "0.1", features = ["net"] }
-tonic = "0.7"
+tonic = "0.8"
 tracing = "0.1.36"
 tracing-appender = "0.2.2"
 tracing-subscriber = { version = "0.3.15", features = ["fmt", "env-filter", 
"ansi"] }
diff --git a/ballista/rust/scheduler/Cargo.toml 
b/ballista/rust/scheduler/Cargo.toml
index 9f098038..8b4f6bd9 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -32,8 +32,10 @@ scheduler = "scheduler_config_spec.toml"
 [features]
 default = ["etcd", "sled"]
 etcd = ["etcd-client"]
+flight-sql = []
 sled = ["sled_package", "tokio-stream"]
 
+
 [dependencies]
 anyhow = "1"
 arrow-flight = { version = "18.0.0", features = ["flight-sql-experimental"] }
@@ -42,8 +44,8 @@ async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.7.0" }
 clap = { version = "3", features = ["derive", "cargo"] }
 configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"7aed4d697fa24053d515babfd7678855451c6736" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"7aed4d697fa24053d515babfd7678855451c6736" }
 etcd-client = { version = "0.9", optional = true }
 flatbuffers = { version = "2.1.2" }
 futures = "0.3"
@@ -52,16 +54,16 @@ http-body = "0.4"
 hyper = "0.14.4"
 itertools = "0.10.3"
 log = "0.4"
-object_store = "0.3.0"
+object_store = "0.4.0"
 parking_lot = "0.12"
 parse_arg = "0.1.3"
-prost = "0.10"
+prost = "0.11"
 rand = "0.8"
 serde = { version = "1", features = ["derive"] }
 sled_package = { package = "sled", version = "0.34", optional = true }
 tokio = { version = "1.0", features = ["full"] }
 tokio-stream = { version = "0.1", features = ["net"], optional = true }
-tonic = "0.7"
+tonic = "0.8"
 tower = { version = "0.4" }
 tracing = "0.1.36"
 tracing-appender = "0.2.2"
@@ -74,4 +76,4 @@ ballista-core = { path = "../core", version = "0.7.0" }
 
 [build-dependencies]
 configure_me_codegen = "0.4.1"
-tonic-build = { version = "0.7" }
+tonic-build = { version = "0.8", default-features = false, features = 
["transport", "prost"] }
diff --git a/ballista/rust/scheduler/src/lib.rs 
b/ballista/rust/scheduler/src/lib.rs
index 838eb562..d755bc68 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -25,6 +25,7 @@ pub mod scheduler_server;
 pub mod standalone;
 pub mod state;
 
+#[cfg(feature = "flight-sql")]
 pub mod flight_sql;
 #[cfg(test)]
 pub mod test_utils;
diff --git a/ballista/rust/scheduler/src/main.rs 
b/ballista/rust/scheduler/src/main.rs
index d393e4eb..50bfe85c 100644
--- a/ballista/rust/scheduler/src/main.rs
+++ b/ballista/rust/scheduler/src/main.rs
@@ -18,6 +18,7 @@
 //! Ballista Rust scheduler binary.
 
 use anyhow::{Context, Result};
+#[cfg(feature = "flight-sql")]
 use arrow_flight::flight_service_server::FlightServiceServer;
 use 
ballista_scheduler::scheduler_server::externalscaler::external_scaler_server::ExternalScalerServer;
 use futures::future::{self, Either, TryFutureExt};
@@ -60,6 +61,7 @@ mod config {
 }
 
 use ballista_core::utils::create_grpc_server;
+#[cfg(feature = "flight-sql")]
 use ballista_scheduler::flight_sql::FlightSqlServiceImpl;
 use config::prelude::*;
 use datafusion::execution::context::default_session_builder;
@@ -102,17 +104,19 @@ async fn start_server(
             let scheduler_grpc_server =
                 SchedulerGrpcServer::new(scheduler_server.clone());
 
-            let flight_sql_server = 
FlightServiceServer::new(FlightSqlServiceImpl::new(
-                scheduler_server.clone(),
-            ));
-
             let keda_scaler = 
ExternalScalerServer::new(scheduler_server.clone());
 
-            let mut tonic = create_grpc_server()
+            let tonic_builder = create_grpc_server()
                 .add_service(scheduler_grpc_server)
-                .add_service(flight_sql_server)
-                .add_service(keda_scaler)
-                .into_service();
+                .add_service(keda_scaler);
+
+            #[cfg(feature = "flight-sql")]
+            tonic_builder.add_service(FlightServiceServer::new(
+                FlightSqlServiceImpl::new(scheduler_server.clone()),
+            ));
+
+            let mut tonic = tonic_builder.into_service();
+
             let mut warp = warp::service(get_routes(scheduler_server.clone()));
 
             let connect_info = request.connect_info();
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index d2797322..937a0df6 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -33,8 +33,8 @@ snmalloc = ["snmalloc-rs"]
 
 [dependencies]
 ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"7aed4d697fa24053d515babfd7678855451c6736" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"7aed4d697fa24053d515babfd7678855451c6736" }
 env_logger = "0.9"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index c20194ac..10528a28 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,9 +35,9 @@ required-features = ["ballista/standalone"]
 
 [dependencies]
 ballista = { path = "../ballista/rust/client", version = "0.7.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"7aed4d697fa24053d515babfd7678855451c6736" }
 futures = "0.3"
 num_cpus = "1.13.0"
-prost = "0.10"
+prost = "0.11"
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", 
"sync", "parking_lot"] }
-tonic = "0.7"
+tonic = "0.8"
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 01e3b5b6..fd5779ea 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -36,7 +36,7 @@ default = ["mimalloc"]
 [dependencies]
 async-trait = "0.1"
 ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7", features = ["pyarrow"] }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"7aed4d697fa24053d515babfd7678855451c6736", features = ["pyarrow"] }
 futures = "0.3"
 mimalloc = { version = "*", optional = true, default-features = false }
 pyo3 = { version = "~0.16.5", features = ["extension-module", "abi3", 
"abi3-py37"] }
diff --git a/python/src/functions.rs b/python/src/functions.rs
index 44b294d6..3d86867d 100644
--- a/python/src/functions.rs
+++ b/python/src/functions.rs
@@ -46,7 +46,7 @@ fn in_list(expr: PyExpr, value: Vec<PyExpr>, negated: bool) 
-> PyExpr {
 fn now() -> PyExpr {
     PyExpr {
         // here lit(0) is a stub for conform to arity
-        expr: logical_plan::now(logical_plan::lit(0)),
+        expr: logical_plan::now(),
     }
 }
 
diff --git a/python/src/udaf.rs b/python/src/udaf.rs
index 1fdf0d0d..ff26078e 100644
--- a/python/src/udaf.rs
+++ b/python/src/udaf.rs
@@ -25,7 +25,7 @@ use datafusion::arrow::pyarrow::PyArrowConvert;
 use datafusion::common::ScalarValue;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::logical_expr::{
-    Accumulator, AccumulatorFunctionImplementation, AggregateUDF,
+    Accumulator, AccumulatorFunctionImplementation, AggregateState, 
AggregateUDF,
 };
 use datafusion::logical_plan;
 
@@ -44,9 +44,13 @@ impl RustAccumulator {
 }
 
 impl Accumulator for RustAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
-        Python::with_gil(|py| 
self.accum.as_ref(py).call_method0("state")?.extract())
-            .map_err(|e| DataFusionError::Execution(format!("{}", e)))
+    fn state(&self) -> Result<Vec<AggregateState>> {
+        let py_result: PyResult<Vec<ScalarValue>> =
+            Python::with_gil(|py| 
self.accum.as_ref(py).call_method0("state")?.extract());
+        match py_result {
+            Ok(r) => Ok(r.into_iter().map(AggregateState::Scalar).collect()),
+            Err(e) => Err(DataFusionError::Execution(format!("{}", e))),
+        }
     }
 
     fn evaluate(&self) -> Result<ScalarValue> {

Reply via email to