This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 1aec2c9 fix: Fix arrow error when sorting on empty batch (#271)
1aec2c9 is described below
commit 1aec2c9403b6ced1dfb740e8c41af7617b9b02b8
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Tue Apr 16 11:38:46 2024 -0700
fix: Fix arrow error when sorting on empty batch (#271)
* fix: Fix arrow error when sorting on empty batch
* Add RecordBatchOptions for Expand
---
core/Cargo.lock | 20 ++++++++++----------
core/Cargo.toml | 8 ++++----
core/src/execution/datafusion/operators/expand.rs | 6 ++++--
core/src/execution/operators/copy.rs | 7 +++++--
.../org/apache/comet/exec/CometAggregateSuite.scala | 21 +++++++++++++++++++++
5 files changed, 44 insertions(+), 18 deletions(-)
diff --git a/core/Cargo.lock b/core/Cargo.lock
index e209e4a..3fb7b5f 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -825,7 +825,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "36.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=111a940#111a940b297aa83839e4e2273f0e1a38e108b370"
+source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0"
dependencies = [
"ahash",
"arrow",
@@ -867,7 +867,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "36.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=111a940#111a940b297aa83839e4e2273f0e1a38e108b370"
+source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0"
dependencies = [
"ahash",
"arrow",
@@ -886,7 +886,7 @@ dependencies = [
[[package]]
name = "datafusion-common-runtime"
version = "36.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=111a940#111a940b297aa83839e4e2273f0e1a38e108b370"
+source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0"
dependencies = [
"tokio",
]
@@ -894,7 +894,7 @@ dependencies = [
[[package]]
name = "datafusion-execution"
version = "36.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=111a940#111a940b297aa83839e4e2273f0e1a38e108b370"
+source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0"
dependencies = [
"arrow",
"chrono",
@@ -914,7 +914,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "36.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=111a940#111a940b297aa83839e4e2273f0e1a38e108b370"
+source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0"
dependencies = [
"ahash",
"arrow",
@@ -930,7 +930,7 @@ dependencies = [
[[package]]
name = "datafusion-functions"
version = "36.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=111a940#111a940b297aa83839e4e2273f0e1a38e108b370"
+source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0"
dependencies = [
"arrow",
"base64",
@@ -954,7 +954,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "36.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=111a940#111a940b297aa83839e4e2273f0e1a38e108b370"
+source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0"
dependencies = [
"arrow",
"async-trait",
@@ -971,7 +971,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "36.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=111a940#111a940b297aa83839e4e2273f0e1a38e108b370"
+source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0"
dependencies = [
"ahash",
"arrow",
@@ -1005,7 +1005,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-plan"
version = "36.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=111a940#111a940b297aa83839e4e2273f0e1a38e108b370"
+source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0"
dependencies = [
"ahash",
"arrow",
@@ -1035,7 +1035,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "36.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=111a940#111a940b297aa83839e4e2273f0e1a38e108b370"
+source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0"
dependencies = [
"arrow",
"arrow-array",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 880d18d..5d16049 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -66,10 +66,10 @@ itertools = "0.11.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.8" }
paste = "1.0.14"
-datafusion-common = { git = "https://github.com/viirya/arrow-datafusion.git",
rev = "111a940" }
-datafusion = { default-features = false, git =
"https://github.com/viirya/arrow-datafusion.git", rev = "111a940", features =
["unicode_expressions"] }
-datafusion-functions = { git =
"https://github.com/viirya/arrow-datafusion.git", rev = "111a940" }
-datafusion-physical-expr = { git =
"https://github.com/viirya/arrow-datafusion.git", rev = "111a940",
default-features = false, features = ["unicode_expressions"] }
+datafusion-common = { git = "https://github.com/viirya/arrow-datafusion.git",
rev = "57b3be4" }
+datafusion = { default-features = false, git =
"https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4", features =
["unicode_expressions"] }
+datafusion-functions = { git =
"https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4" }
+datafusion-physical-expr = { git =
"https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4",
default-features = false, features = ["unicode_expressions"] }
unicode-segmentation = "^1.10.1"
once_cell = "1.18.0"
regex = "1.9.6"
diff --git a/core/src/execution/datafusion/operators/expand.rs
b/core/src/execution/datafusion/operators/expand.rs
index 5cf444b..ca3fdc1 100644
--- a/core/src/execution/datafusion/operators/expand.rs
+++ b/core/src/execution/datafusion/operators/expand.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use arrow_array::RecordBatch;
+use arrow_array::{RecordBatch, RecordBatchOptions};
use arrow_schema::SchemaRef;
use datafusion::{
execution::TaskContext,
@@ -169,7 +169,9 @@ impl ExpandStream {
Ok::<(), DataFusionError>(())
})?;
- RecordBatch::try_new(self.schema.clone(), columns).map_err(|e|
e.into())
+ let options =
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
+ RecordBatch::try_new_with_options(self.schema.clone(), columns,
&options)
+ .map_err(|e| e.into())
}
}
diff --git a/core/src/execution/operators/copy.rs
b/core/src/execution/operators/copy.rs
index 292271f..96c2449 100644
--- a/core/src/execution/operators/copy.rs
+++ b/core/src/execution/operators/copy.rs
@@ -24,7 +24,7 @@ use std::{
use futures::{Stream, StreamExt};
-use arrow_array::{ArrayRef, RecordBatch};
+use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions};
use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
use datafusion::{execution::TaskContext, physical_expr::*, physical_plan::*};
@@ -149,7 +149,10 @@ impl CopyStream {
.iter()
.map(|v| copy_or_cast_array(v))
.collect::<Result<Vec<ArrayRef>, _>>()?;
- RecordBatch::try_new(self.schema.clone(), vectors).map_err(|e|
arrow_datafusion_err!(e))
+
+ let options =
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
+ RecordBatch::try_new_with_options(self.schema.clone(), vectors,
&options)
+ .map_err(|e| arrow_datafusion_err!(e))
}
}
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
index 230ac36..b5ed5f4 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
@@ -40,6 +40,27 @@ import
org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
import testImplicits._
+ test("lead/lag should return the default value if the offset row does not
exist") {
+ withSQLConf(
+ CometConf.COMET_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ checkSparkAnswer(sql("""
+ |SELECT
+ | lag(123, 100, 321) OVER (ORDER BY id) as lag,
+ | lead(123, 100, 321) OVER (ORDER BY id) as lead
+ |FROM (SELECT 1 as id) tmp
+ """.stripMargin))
+
+ checkSparkAnswer(sql("""
+ |SELECT
+ | lag(123, 100, a) OVER (ORDER BY id) as lag,
+ | lead(123, 100, a) OVER (ORDER BY id) as lead
+ |FROM (SELECT 1 as id, 2 as a) tmp
+ """.stripMargin))
+ }
+ }
+
test("multiple column distinct count") {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",