This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 62ffffc3 deps: bump datafusion (#1445)
62ffffc3 is described below
commit 62ffffc3057b9159d7a67c4909a3c0e3fe670fc6
Author: Ruixiang Tan <[email protected]>
AuthorDate: Fri Feb 23 15:06:45 2024 +0800
deps: bump datafusion (#1445)
## Rationale
Close #1461
## Detailed Changes
Bump datafusion to
https://github.com/CeresDB/arrow-datafusion/commits/e21b03154, which is
version 33.
Some important breaking changes:
- https://github.com/apache/arrow-datafusion/pull/7920
- https://github.com/apache/arrow-datafusion/issues/9109
## Test Plan
CI
---------
Co-authored-by: jiacai2050 <[email protected]>
---
Cargo.lock | 549 ++++++++++++---------
Cargo.toml | 22 +-
.../cases/common/dml/issue-1087.result | 17 +-
.../cases/common/dml/issue-302.result | 2 +-
.../cases/common/dml/issue-341.result | 12 +-
integration_tests/cases/common/dml/issue-59.result | 4 +-
.../cases/common/explain/explain.result | 2 +-
.../cases/common/function/aggregate.result | 43 ++
.../cases/common/function/aggregate.sql | 28 ++
.../cases/common/optimizer/optimizer.result | 2 +-
.../cases/env/cluster/ddl/partition_table.result | 8 +-
.../cases/env/cluster/ddl/partition_table.sql | 4 +
.../cases/env/local/ddl/query-plan.result | 27 +-
.../cases/env/local/ddl/query-plan.sql | 9 +
.../src/instance/reorder_memtable.rs | 11 +-
src/analytic_engine/src/memtable/skiplist/iter.rs | 5 +
.../src/row_iter/record_batch_stream.rs | 1 +
src/analytic_engine/src/table/mod.rs | 1 +
src/common_types/src/datum.rs | 16 +-
src/common_types/src/projected_schema.rs | 6 +-
src/common_types/src/record_batch.rs | 58 ++-
src/components/parquet_ext/src/meta_data.rs | 3 +-
src/components/parquet_ext/src/prune/min_max.rs | 4 +-
.../src/dist_sql_query/physical_plan.rs | 18 +-
.../src/dist_sql_query/test_util.rs | 6 +-
src/df_operator/src/scalar.rs | 3 +-
src/df_operator/src/udaf.rs | 3 +-
src/interpreters/src/insert.rs | 2 +-
src/interpreters/src/tests.rs | 18 +-
src/proxy/src/grpc/prom_query.rs | 2 +-
src/proxy/src/influxdb/types.rs | 2 +-
src/query_engine/src/datafusion_impl/mod.rs | 4 +-
.../physical_optimizer/repartition.rs | 6 +-
.../physical_plan_extension/prom_align.rs | 18 +-
.../src/datafusion_impl/task_context.rs | 3 +-
src/query_frontend/src/influxql/planner.rs | 2 +-
src/query_frontend/src/logical_optimizer/mod.rs | 3 +-
.../src/logical_optimizer/type_conversion.rs | 9 +-
src/query_frontend/src/parser.rs | 14 +-
src/query_frontend/src/promql/convert.rs | 12 +-
src/query_frontend/src/promql/remote.rs | 2 +-
src/query_frontend/src/provider.rs | 2 +-
src/table_engine/src/memory.rs | 2 +-
src/table_engine/src/predicate.rs | 6 +-
src/table_engine/src/provider.rs | 83 +++-
src/table_engine/src/table.rs | 1 +
46 files changed, 685 insertions(+), 370 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 5f230330..f43a9036 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -90,7 +90,7 @@ version = "1.2.6-alpha"
dependencies = [
"arc-swap 1.6.0",
"arena",
- "arrow 43.0.0",
+ "arrow 49.0.0",
"async-stream",
"async-trait",
"atomic_enum",
@@ -120,7 +120,7 @@ dependencies = [
"parquet_ext",
"pin-project-lite",
"prometheus 0.12.0",
- "prost",
+ "prost 0.11.8",
"rand 0.7.3",
"remote_engine_client",
"router",
@@ -245,24 +245,24 @@ dependencies = [
[[package]]
name = "arrow"
-version = "43.0.0"
+version = "49.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2feeebd77b34b0bc88f224e06d01c27da4733997cc4789a4e056196656cdc59a"
+checksum = "5bc25126d18a012146a888a0298f2c22e1150327bd2765fc76d710a556b2d614"
dependencies = [
"ahash 0.8.3",
- "arrow-arith 43.0.0",
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-cast 43.0.0",
- "arrow-csv 43.0.0",
- "arrow-data 43.0.0",
- "arrow-ipc 43.0.0",
- "arrow-json 43.0.0",
- "arrow-ord 43.0.0",
- "arrow-row 43.0.0",
- "arrow-schema 43.0.0",
- "arrow-select 43.0.0",
- "arrow-string 43.0.0",
+ "arrow-arith 49.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-cast 49.0.0",
+ "arrow-csv 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-ipc 49.0.0",
+ "arrow-json 49.0.0",
+ "arrow-ord 49.0.0",
+ "arrow-row 49.0.0",
+ "arrow-schema 49.0.0",
+ "arrow-select 49.0.0",
+ "arrow-string 49.0.0",
]
[[package]]
@@ -282,14 +282,14 @@ dependencies = [
[[package]]
name = "arrow-arith"
-version = "43.0.0"
+version = "49.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7173f5dc49c0ecb5135f52565af33afd3fdc9a12d13bd6f9973e8b96305e4b2e"
+checksum = "34ccd45e217ffa6e53bbb0080990e77113bdd4e91ddb84e97b77649810bcf1a7"
dependencies = [
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
"chrono",
"half 2.2.1",
"num",
@@ -313,14 +313,14 @@ dependencies = [
[[package]]
name = "arrow-array"
-version = "43.0.0"
+version = "49.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "63d7ea725f7d1f8bb2cffc53ef538557e95fc802e217d5be25122d402e22f3d0"
+checksum = "6bda9acea48b25123c08340f3a8ac361aa0f74469bb36f5ee9acf923fce23e9d"
dependencies = [
"ahash 0.8.3",
- "arrow-buffer 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
"chrono",
"chrono-tz",
"half 2.2.1",
@@ -340,10 +340,11 @@ dependencies = [
[[package]]
name = "arrow-buffer"
-version = "43.0.0"
+version = "49.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bdbe439e077f484e5000b9e1d47b5e4c0d15f2b311a8f5bcc682553d5d67a722"
+checksum = "01a0fc21915b00fc6c2667b069c1b64bdd920982f426079bc4a7cab86822886c"
dependencies = [
+ "bytes",
"half 2.2.1",
"num",
]
@@ -366,15 +367,16 @@ dependencies = [
[[package]]
name = "arrow-cast"
-version = "43.0.0"
+version = "49.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "93913cc14875770aa1eef5e310765e855effa352c094cb1c7c00607d0f37b4e1"
+checksum = "5dc0368ed618d509636c1e3cc20db1281148190a78f43519487b2daf07b63b4a"
dependencies = [
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
- "arrow-select 43.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
+ "arrow-select 49.0.0",
+ "base64 0.21.0",
"chrono",
"comfy-table 7.0.1",
"half 2.2.1",
@@ -403,15 +405,15 @@ dependencies = [
[[package]]
name = "arrow-csv"
-version = "43.0.0"
+version = "49.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ef55b67c55ed877e6fe7b923121c19dae5e31ca70249ea2779a17b58fb0fbd9a"
+checksum = "2e09aa6246a1d6459b3f14baeaa49606cfdbca34435c46320e14054d244987ca"
dependencies = [
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-cast 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-cast 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
"chrono",
"csv",
"csv-core",
@@ -434,12 +436,12 @@ dependencies = [
[[package]]
name = "arrow-data"
-version = "43.0.0"
+version = "49.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d4f4f4a3c54614126a71ab91f6631c9743eb4643d6e9318b74191da9dc6e028b"
+checksum = "907fafe280a3874474678c1858b9ca4cb7fd83fb8034ff5b6d6376205a08c634"
dependencies = [
- "arrow-buffer 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-schema 49.0.0",
"half 2.2.1",
"num",
]
@@ -460,15 +462,15 @@ dependencies = [
[[package]]
name = "arrow-ipc"
-version = "43.0.0"
+version = "49.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d41a3659f984a524ef1c2981d43747b24d8eec78e2425267fcd0ef34ce71cd18"
+checksum = "79a43d6808411886b8c7d4f6f7dd477029c1e77ffffffb7923555cc6579639cd"
dependencies = [
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-cast 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-cast 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
"flatbuffers",
]
@@ -494,15 +496,15 @@ dependencies = [
[[package]]
name = "arrow-json"
-version = "43.0.0"
+version = "49.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "10b95faa95a378f56ef32d84cc0104ea998c39ef7cd1faaa6b4cebf8ea92846d"
+checksum = "d82565c91fd627922ebfe2810ee4e8346841b6f9361b87505a9acea38b614fee"
dependencies = [
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-cast 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-cast 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
"chrono",
"half 2.2.1",
"indexmap 2.0.0",
@@ -529,15 +531,15 @@ dependencies = [
[[package]]
name = "arrow-ord"
-version = "43.0.0"
+version = "49.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c68549a4284d9f8b39586afb8d5ff8158b8f0286353a4844deb1d11cf1ba1f26"
+checksum = "9b23b0e53c0db57c6749997fd343d4c0354c994be7eca67152dd2bdb9a3e1bb4"
dependencies = [
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
- "arrow-select 43.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
+ "arrow-select 49.0.0",
"half 2.2.1",
"num",
]
@@ -559,15 +561,15 @@ dependencies = [
[[package]]
name = "arrow-row"
-version = "43.0.0"
+version = "49.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0a75a4a757afc301ce010adadff54d79d66140c4282ed3de565f6ccb716a5cf3"
+checksum = "361249898d2d6d4a6eeb7484be6ac74977e48da12a4dd81a708d620cc558117a"
dependencies = [
"ahash 0.8.3",
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
"half 2.2.1",
"hashbrown 0.14.0",
]
@@ -580,9 +582,9 @@ checksum =
"bc85923d8d6662cc66ac6602c7d1876872e671002d60993dfdf492a6badeae92"
[[package]]
name = "arrow-schema"
-version = "43.0.0"
+version = "49.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2bebcb57eef570b15afbcf2d07d813eb476fde9f6dd69c81004d6476c197e87e"
+checksum = "09e28a5e781bf1b0f981333684ad13f5901f4cd2f20589eab7cf1797da8fc167"
[[package]]
name = "arrow-select"
@@ -599,14 +601,15 @@ dependencies = [
[[package]]
name = "arrow-select"
-version = "43.0.0"
+version = "49.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f6e2943fa433a48921e914417173816af64eef61c0a3d448280e6c40a62df221"
+checksum = "4f6208466590960efc1d2a7172bc4ff18a67d6e25c529381d7f96ddaf0dc4036"
dependencies = [
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
+ "ahash 0.8.3",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
"num",
]
@@ -627,37 +630,37 @@ dependencies = [
[[package]]
name = "arrow-string"
-version = "43.0.0"
+version = "49.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bbc92ed638851774f6d7af1ad900b92bc1486746497511868b4298fcbcfa35af"
+checksum = "a4a48149c63c11c9ff571e50ab8f017d2a7cb71037a882b42f6354ed2da9acc7"
dependencies = [
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
- "arrow-select 43.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
+ "arrow-select 49.0.0",
"num",
"regex",
- "regex-syntax 0.7.1",
+ "regex-syntax 0.8.2",
]
[[package]]
name = "arrow_ext"
version = "1.2.6-alpha"
dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
"serde",
"snafu 0.6.10",
- "zstd",
+ "zstd 0.12.3+zstd.1.5.2",
]
[[package]]
name = "arrow_util"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08"
dependencies = [
"ahash 0.8.3",
- "arrow 43.0.0",
+ "arrow 49.0.0",
"chrono",
"comfy-table 6.1.4",
"hashbrown 0.13.2",
@@ -682,8 +685,8 @@ dependencies = [
"pin-project-lite",
"tokio",
"xz2",
- "zstd",
- "zstd-safe",
+ "zstd 0.12.3+zstd.1.5.2",
+ "zstd-safe 6.0.4+zstd.1.5.4",
]
[[package]]
@@ -750,9 +753,9 @@ dependencies = [
[[package]]
name = "async-trait"
-version = "0.1.72"
+version = "0.1.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cc6dde6e4ed435a4c1ee4e73592f5ba9da2151af10076cc04858746af9352d09"
+checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9"
dependencies = [
"proc-macro2",
"quote",
@@ -881,7 +884,7 @@ version = "1.2.6-alpha"
dependencies = [
"analytic_engine",
"arena",
- "arrow 43.0.0",
+ "arrow 49.0.0",
"base64 0.13.1",
"bytes_ext",
"clap 3.2.23",
@@ -908,7 +911,7 @@ dependencies = [
"toml_ext",
"trace_metric",
"wal",
- "zstd",
+ "zstd 0.12.3+zstd.1.5.2",
]
[[package]]
@@ -1452,7 +1455,7 @@ dependencies = [
"logger",
"macros",
"meta_client",
- "prost",
+ "prost 0.11.8",
"runtime",
"serde",
"serde_json",
@@ -1519,7 +1522,7 @@ dependencies = [
name = "common_types"
version = "1.2.6-alpha"
dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
"arrow_ext",
"bytes_ext",
"chrono",
@@ -1528,7 +1531,7 @@ dependencies = [
"horaedbproto 2.0.0",
"macros",
"paste 1.0.12",
- "prost",
+ "prost 0.11.8",
"rand 0.7.3",
"seahash",
"serde",
@@ -1565,7 +1568,7 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e"
dependencies = [
- "prost",
+ "prost 0.11.8",
"prost-types",
"tonic 0.9.2",
"tracing-core",
@@ -2003,13 +2006,13 @@ dependencies = [
[[package]]
name = "datafusion"
-version = "27.0.0"
-source =
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d"
+version = "33.0.0"
+source =
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1"
dependencies = [
"ahash 0.8.3",
- "arrow 43.0.0",
- "arrow-array 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow 49.0.0",
+ "arrow-array 49.0.0",
+ "arrow-schema 49.0.0",
"async-compression",
"async-trait",
"bytes",
@@ -2021,24 +2024,22 @@ dependencies = [
"datafusion-expr",
"datafusion-optimizer",
"datafusion-physical-expr",
- "datafusion-row",
+ "datafusion-physical-plan",
"datafusion-sql",
"flate2",
"futures 0.3.28",
"glob",
+ "half 2.2.1",
"hashbrown 0.14.0",
"indexmap 2.0.0",
- "itertools 0.11.0",
- "lazy_static",
+ "itertools 0.12.0",
"log",
"num_cpus",
- "object_store 0.6.1",
+ "object_store 0.8.0",
"parking_lot 0.12.1",
"parquet",
- "percent-encoding",
"pin-project-lite",
"rand 0.8.5",
- "smallvec",
"sqlparser",
"tempfile",
"tokio",
@@ -2046,34 +2047,42 @@ dependencies = [
"url",
"uuid",
"xz2",
- "zstd",
+ "zstd 0.13.0",
]
[[package]]
name = "datafusion-common"
-version = "27.0.0"
-source =
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d"
+version = "33.0.0"
+source =
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1"
dependencies = [
- "arrow 43.0.0",
- "arrow-array 43.0.0",
+ "ahash 0.8.3",
+ "arrow 49.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-schema 49.0.0",
"chrono",
+ "half 2.2.1",
+ "libc",
"num_cpus",
- "object_store 0.6.1",
+ "object_store 0.8.0",
"parquet",
"sqlparser",
]
[[package]]
name = "datafusion-execution"
-version = "27.0.0"
-source =
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d"
+version = "33.0.0"
+source =
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1"
dependencies = [
+ "arrow 49.0.0",
+ "chrono",
"dashmap 5.4.0",
"datafusion-common",
"datafusion-expr",
+ "futures 0.3.28",
"hashbrown 0.14.0",
"log",
- "object_store 0.6.1",
+ "object_store 0.8.0",
"parking_lot 0.12.1",
"rand 0.8.5",
"tempfile",
@@ -2082,13 +2091,14 @@ dependencies = [
[[package]]
name = "datafusion-expr"
-version = "27.0.0"
-source =
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d"
+version = "33.0.0"
+source =
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1"
dependencies = [
"ahash 0.8.3",
- "arrow 43.0.0",
+ "arrow 49.0.0",
+ "arrow-array 49.0.0",
"datafusion-common",
- "lazy_static",
+ "paste 1.0.12",
"sqlparser",
"strum 0.25.0",
"strum_macros 0.25.1",
@@ -2096,45 +2106,43 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
-version = "27.0.0"
-source =
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d"
+version = "33.0.0"
+source =
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1"
dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
"async-trait",
"chrono",
"datafusion-common",
"datafusion-expr",
"datafusion-physical-expr",
"hashbrown 0.14.0",
- "itertools 0.11.0",
+ "itertools 0.12.0",
"log",
- "regex-syntax 0.7.1",
+ "regex-syntax 0.8.2",
]
[[package]]
name = "datafusion-physical-expr"
-version = "27.0.0"
-source =
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d"
+version = "33.0.0"
+source =
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1"
dependencies = [
"ahash 0.8.3",
- "arrow 43.0.0",
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow 49.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-ord 49.0.0",
+ "arrow-schema 49.0.0",
"base64 0.21.0",
"blake2",
"blake3",
"chrono",
"datafusion-common",
"datafusion-expr",
- "datafusion-row",
"half 2.2.1",
"hashbrown 0.14.0",
"hex",
"indexmap 2.0.0",
- "itertools 0.11.0",
- "lazy_static",
- "libc",
+ "itertools 0.12.0",
"log",
"md-5",
"paste 1.0.12",
@@ -2147,37 +2155,56 @@ dependencies = [
]
[[package]]
-name = "datafusion-proto"
-version = "27.0.0"
-source =
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d"
+name = "datafusion-physical-plan"
+version = "33.0.0"
+source =
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1"
dependencies = [
- "arrow 43.0.0",
+ "ahash 0.8.3",
+ "arrow 49.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-schema 49.0.0",
+ "async-trait",
"chrono",
- "datafusion",
"datafusion-common",
+ "datafusion-execution",
"datafusion-expr",
- "object_store 0.6.1",
- "prost",
+ "datafusion-physical-expr",
+ "futures 0.3.28",
+ "half 2.2.1",
+ "hashbrown 0.14.0",
+ "indexmap 2.0.0",
+ "itertools 0.12.0",
+ "log",
+ "once_cell",
+ "parking_lot 0.12.1",
+ "pin-project-lite",
+ "rand 0.8.5",
+ "tokio",
+ "uuid",
]
[[package]]
-name = "datafusion-row"
-version = "27.0.0"
-source =
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d"
+name = "datafusion-proto"
+version = "33.0.0"
+source =
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1"
dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
+ "chrono",
+ "datafusion",
"datafusion-common",
- "paste 1.0.12",
- "rand 0.8.5",
+ "datafusion-expr",
+ "object_store 0.8.0",
+ "prost 0.12.3",
]
[[package]]
name = "datafusion-sql"
-version = "27.0.0"
-source =
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d"
+version = "33.0.0"
+source =
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1"
dependencies = [
- "arrow 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow 49.0.0",
+ "arrow-schema 49.0.0",
"datafusion-common",
"datafusion-expr",
"log",
@@ -2187,7 +2214,7 @@ dependencies = [
[[package]]
name = "datafusion_util"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08"
dependencies = [
"async-trait",
"datafusion",
@@ -2305,7 +2332,7 @@ dependencies = [
name = "df_engine_extensions"
version = "1.2.6-alpha"
dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
"async-recursion",
"async-trait",
"catalog",
@@ -2318,7 +2345,7 @@ dependencies = [
"insta",
"lazy_static",
"prometheus 0.12.0",
- "prost",
+ "prost 0.11.8",
"runtime",
"snafu 0.6.10",
"table_engine",
@@ -2330,7 +2357,7 @@ dependencies = [
name = "df_operator"
version = "1.2.6-alpha"
dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
"base64 0.13.1",
"bincode",
"chrono",
@@ -2470,7 +2497,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "4319dc0fb739a6e84cb8678b8cf50c9bcfa4712ae826b33ecf00cc0850550a58"
dependencies = [
"http",
- "prost",
+ "prost 0.11.8",
"tokio",
"tokio-stream",
"tonic 0.8.3",
@@ -2808,12 +2835,12 @@ checksum =
"8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2"
[[package]]
name = "generated_types"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08"
dependencies = [
"pbjson",
"pbjson-build",
"pbjson-types",
- "prost",
+ "prost 0.11.8",
"prost-build",
"serde",
"tonic-build",
@@ -3071,7 +3098,7 @@ dependencies = [
"thiserror",
"tokio",
"tonic 0.8.3",
- "zstd",
+ "zstd 0.12.3+zstd.1.5.2",
]
[[package]]
@@ -3095,7 +3122,7 @@ version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5907c770ee20818978cf2050341ca2c4c7fb7888423ccb090cbb2fda250dfad7"
dependencies = [
- "prost",
+ "prost 0.11.8",
"protoc-bin-vendored",
"tonic 0.8.3",
"tonic-build",
@@ -3107,7 +3134,7 @@ name = "horaedbproto"
version = "2.0.0"
source =
"git+https://github.com/apache/incubator-horaedb-proto.git?rev=19ece8f771fc0b3e8e734072cc3d8040de6c74cb#19ece8f771fc0b3e8e734072cc3d8040de6c74cb"
dependencies = [
- "prost",
+ "prost 0.11.8",
"protoc-bin-vendored",
"tonic 0.8.3",
"tonic-build",
@@ -3325,7 +3352,7 @@ dependencies = [
[[package]]
name = "influxdb_influxql_parser"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08"
dependencies = [
"chrono",
"chrono-tz",
@@ -3367,7 +3394,7 @@ name = "interpreters"
version = "1.2.6-alpha"
dependencies = [
"analytic_engine",
- "arrow 43.0.0",
+ "arrow 49.0.0",
"async-trait",
"catalog",
"catalog_impls",
@@ -3418,9 +3445,9 @@ dependencies = [
[[package]]
name = "iox_query"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08"
dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
"arrow_util",
"async-trait",
"chrono",
@@ -3442,9 +3469,9 @@ dependencies = [
[[package]]
name = "iox_query_influxql"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08"
dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
"chrono",
"chrono-tz",
"datafusion",
@@ -3497,6 +3524,15 @@ dependencies = [
"either",
]
+[[package]]
+name = "itertools"
+version = "0.12.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0"
+dependencies = [
+ "either",
+]
+
[[package]]
name = "itoa"
version = "1.0.6"
@@ -3953,7 +3989,7 @@ dependencies = [
"horaedbproto 2.0.0",
"logger",
"macros",
- "prost",
+ "prost 0.11.8",
"reqwest",
"serde",
"serde_json",
@@ -4314,9 +4350,9 @@ dependencies = [
[[package]]
name = "num"
-version = "0.4.0"
+version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606"
+checksum = "b05180d69e3da0e530ba2a1dae5110317e49e3b7f3d41be227dc5f92e49ee7af"
dependencies = [
"num-bigint",
"num-complex",
@@ -4456,16 +4492,16 @@ dependencies = [
[[package]]
name = "object_store"
-version = "0.6.1"
+version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "27c776db4f332b571958444982ff641d2531417a326ca368995073b639205d58"
+checksum = "2524735495ea1268be33d200e1ee97455096a0846295a21548cd2f3541de7050"
dependencies = [
"async-trait",
"bytes",
"chrono",
"futures 0.3.28",
"humantime 2.1.0",
- "itertools 0.10.5",
+ "itertools 0.11.0",
"parking_lot 0.12.1",
"percent-encoding",
"snafu 0.7.4",
@@ -4497,7 +4533,7 @@ dependencies = [
"partitioned_lock",
"prometheus 0.12.0",
"prometheus-static-metric",
- "prost",
+ "prost 0.11.8",
"rand 0.7.3",
"runtime",
"serde",
@@ -4545,13 +4581,13 @@ dependencies = [
"tokio",
"tokio-util",
"uuid",
- "zstd",
+ "zstd 0.12.3+zstd.1.5.2",
]
[[package]]
name = "observability_deps"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08"
dependencies = [
"tracing",
]
@@ -4675,18 +4711,18 @@ dependencies = [
[[package]]
name = "parquet"
-version = "43.0.0"
+version = "49.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ec7267a9607c3f955d4d0ac41b88a67cecc0d8d009173ad3da390699a6cb3750"
+checksum = "af88740a842787da39b3d69ce5fbf6fce97d20211d3b299fee0a0da6430c74d4"
dependencies = [
"ahash 0.8.3",
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-cast 43.0.0",
- "arrow-data 43.0.0",
- "arrow-ipc 43.0.0",
- "arrow-schema 43.0.0",
- "arrow-select 43.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-cast 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-ipc 49.0.0",
+ "arrow-schema 49.0.0",
+ "arrow-select 49.0.0",
"base64 0.21.0",
"brotli",
"bytes",
@@ -4694,24 +4730,24 @@ dependencies = [
"flate2",
"futures 0.3.28",
"hashbrown 0.14.0",
- "lz4",
+ "lz4_flex",
"num",
"num-bigint",
- "object_store 0.6.1",
+ "object_store 0.8.0",
"paste 1.0.12",
"seq-macro",
"snap",
"thrift",
"tokio",
"twox-hash",
- "zstd",
+ "zstd 0.13.0",
]
[[package]]
name = "parquet_ext"
version = "1.2.6-alpha"
dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
"arrow_ext",
"async-trait",
"bytes",
@@ -4738,7 +4774,7 @@ name = "partition_table_engine"
version = "1.2.6-alpha"
dependencies = [
"analytic_engine",
- "arrow 43.0.0",
+ "arrow 49.0.0",
"async-trait",
"common_types",
"datafusion",
@@ -4805,7 +4841,7 @@ checksum =
"bdbb7b706f2afc610f3853550cdbbf6372fd324824a087806bd4480ea4996e24"
dependencies = [
"heck",
"itertools 0.10.5",
- "prost",
+ "prost 0.11.8",
"prost-types",
]
@@ -4819,7 +4855,7 @@ dependencies = [
"chrono",
"pbjson",
"pbjson-build",
- "prost",
+ "prost 0.11.8",
"prost-build",
"serde",
]
@@ -5179,7 +5215,7 @@ dependencies = [
"async-trait",
"bytes",
"futures 0.3.28",
- "prost",
+ "prost 0.11.8",
"prost-build",
"snap",
"warp",
@@ -5256,7 +5292,17 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "e48e50df39172a3e7eb17e14642445da64996989bc212b583015435d39a58537"
dependencies = [
"bytes",
- "prost-derive",
+ "prost-derive 0.11.8",
+]
+
+[[package]]
+name = "prost"
+version = "0.12.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a"
+dependencies = [
+ "bytes",
+ "prost-derive 0.12.3",
]
[[package]]
@@ -5273,7 +5319,7 @@ dependencies = [
"multimap",
"petgraph",
"prettyplease 0.1.25",
- "prost",
+ "prost 0.11.8",
"prost-types",
"regex",
"syn 1.0.109",
@@ -5294,13 +5340,26 @@ dependencies = [
"syn 1.0.109",
]
+[[package]]
+name = "prost-derive"
+version = "0.12.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e"
+dependencies = [
+ "anyhow",
+ "itertools 0.11.0",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.48",
+]
+
[[package]]
name = "prost-types"
version = "0.11.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "379119666929a1afd7a043aa6cf96fa67a6dce9af60c88095a4686dbce4c9c88"
dependencies = [
- "prost",
+ "prost 0.11.8",
]
[[package]]
@@ -5363,7 +5422,7 @@ checksum =
"9653c3ed92974e34c5a6e0a510864dab979760481714c172e0a34e437cb98804"
name = "proxy"
version = "1.2.6-alpha"
dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
"arrow_ext",
"async-trait",
"bytes",
@@ -5391,7 +5450,7 @@ dependencies = [
"prom-remote-api",
"prometheus 0.12.0",
"prometheus-static-metric",
- "prost",
+ "prost 0.11.8",
"query_engine",
"query_frontend",
"router",
@@ -5409,7 +5468,7 @@ dependencies = [
"tokio-stream",
"tonic 0.8.3",
"warp",
- "zstd",
+ "zstd 0.12.3+zstd.1.5.2",
]
[[package]]
@@ -5463,7 +5522,7 @@ dependencies = [
name = "query_engine"
version = "1.2.6-alpha"
dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
"async-trait",
"bytes_ext",
"catalog",
@@ -5478,7 +5537,7 @@ dependencies = [
"iox_query",
"logger",
"macros",
- "prost",
+ "prost 0.11.8",
"query_frontend",
"runtime",
"serde",
@@ -5493,7 +5552,7 @@ dependencies = [
name = "query_frontend"
version = "1.2.6-alpha"
dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
"async-trait",
"catalog",
"chrono",
@@ -5529,9 +5588,9 @@ dependencies = [
[[package]]
name = "query_functions"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08"
dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
"chrono",
"datafusion",
"itertools 0.10.5",
@@ -5802,6 +5861,12 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5996294f19bd3aae0453a862ad728f60e6600695733dd5df01da90c54363a3c"
+[[package]]
+name = "regex-syntax"
+version = "0.8.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
+
[[package]]
name = "remote_engine_client"
version = "1.2.6-alpha"
@@ -6227,9 +6292,9 @@ dependencies = [
[[package]]
name = "schema"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08"
dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
"hashbrown 0.13.2",
"indexmap 1.9.3",
"itertools 0.10.5",
@@ -6353,7 +6418,7 @@ version = "1.2.6-alpha"
dependencies = [
"analytic_engine",
"arc-swap 1.6.0",
- "arrow 43.0.0",
+ "arrow 49.0.0",
"arrow_ext",
"async-trait",
"bytes_ext",
@@ -6386,7 +6451,7 @@ dependencies = [
"prom-remote-api",
"prometheus 0.12.0",
"prometheus-static-metric",
- "prost",
+ "prost 0.11.8",
"proxy",
"query_engine",
"query_frontend",
@@ -6407,7 +6472,7 @@ dependencies = [
"tonic 0.8.3",
"wal",
"warp",
- "zstd",
+ "zstd 0.12.3+zstd.1.5.2",
]
[[package]]
@@ -6717,9 +6782,9 @@ dependencies = [
[[package]]
name = "sqlparser"
-version = "0.35.0"
+version = "0.39.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ca597d77c98894be1f965f2e4e2d2a61575d4998088e655476c73715c54b2b43"
+checksum = "743b4dc2cbde11890ccb254a8fc9d537fa41b36da00de2a1c5e9848c9bc42bd7"
dependencies = [
"log",
"serde",
@@ -6897,7 +6962,7 @@ dependencies = [
name = "system_catalog"
version = "1.2.6-alpha"
dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
"async-trait",
"bytes_ext",
"catalog",
@@ -6908,7 +6973,7 @@ dependencies = [
"horaedbproto 2.0.0",
"logger",
"macros",
- "prost",
+ "prost 0.11.8",
"snafu 0.6.10",
"table_engine",
"tokio",
@@ -6927,7 +6992,7 @@ dependencies = [
name = "table_engine"
version = "1.2.6-alpha"
dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
"arrow_ext",
"async-trait",
"bytes_ext",
@@ -6943,7 +7008,7 @@ dependencies = [
"lazy_static",
"logger",
"macros",
- "prost",
+ "prost 0.11.8",
"rand 0.7.3",
"regex",
"runtime",
@@ -7024,7 +7089,7 @@ dependencies = [
[[package]]
name = "test_helpers"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08"
dependencies = [
"dotenvy",
"observability_deps",
@@ -7038,7 +7103,7 @@ dependencies = [
name = "test_util"
version = "1.2.6-alpha"
dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
"chrono",
"common_types",
"env_logger",
@@ -7375,8 +7440,8 @@ dependencies = [
"hyper-timeout",
"percent-encoding",
"pin-project",
- "prost",
- "prost-derive",
+ "prost 0.11.8",
+ "prost-derive 0.11.8",
"rustls-pemfile 1.0.2",
"tokio",
"tokio-rustls 0.23.4",
@@ -7408,7 +7473,7 @@ dependencies = [
"hyper-timeout",
"percent-encoding",
"pin-project",
- "prost",
+ "prost 0.11.8",
"tokio",
"tokio-stream",
"tower",
@@ -7647,7 +7712,7 @@ version = "1.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
dependencies = [
- "cfg-if 0.1.10",
+ "cfg-if 1.0.0",
"rand 0.8.5",
"static_assertions",
]
@@ -7804,7 +7869,7 @@ dependencies = [
"macros",
"message_queue",
"prometheus 0.12.0",
- "prost",
+ "prost 0.11.8",
"rand 0.8.5",
"rocksdb",
"runtime",
@@ -8433,7 +8498,16 @@ version = "0.12.3+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76eea132fb024e0e13fd9c2f5d5d595d8a967aa72382ac2f9d39fcc95afd0806"
dependencies = [
- "zstd-safe",
+ "zstd-safe 6.0.4+zstd.1.5.4",
+]
+
+[[package]]
+name = "zstd"
+version = "0.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110"
+dependencies = [
+ "zstd-safe 7.0.0",
]
[[package]]
@@ -8446,6 +8520,15 @@ dependencies = [
"zstd-sys",
]
+[[package]]
+name = "zstd-safe"
+version = "7.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e"
+dependencies = [
+ "zstd-sys",
+]
+
[[package]]
name = "zstd-sys"
version = "2.0.7+zstd.1.5.4"
diff --git a/Cargo.toml b/Cargo.toml
index d195a121..b41694b3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -85,8 +85,8 @@ members = [
[workspace.dependencies]
alloc_tracker = { path = "src/components/alloc_tracker" }
-arrow = { version = "43.0.0", features = ["prettyprint"] }
-arrow_ipc = { version = "43.0.0" }
+arrow = { version = "49.0.0", features = ["prettyprint"] }
+arrow_ipc = { version = "49.0.0" }
arrow_ext = { path = "src/components/arrow_ext" }
analytic_engine = { path = "src/analytic_engine" }
arena = { path = "src/components/arena" }
@@ -107,8 +107,8 @@ cluster = { path = "src/cluster" }
criterion = "0.5"
horaedb-client = "1.0.2"
common_types = { path = "src/common_types" }
-datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev =
"9c3a537e25e5ab3299922864034f67fb2f79805d" }
-datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git",
rev = "9c3a537e25e5ab3299922864034f67fb2f79805d" }
+datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev =
"e21b03154" }
+datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git",
rev = "e21b03154" }
derive_builder = "0.12"
df_operator = { path = "src/df_operator" }
df_engine_extensions = { path = "src/df_engine_extensions" }
@@ -121,10 +121,10 @@ hash_ext = { path = "src/components/hash_ext" }
hex = "0.4.3"
hyperloglog = { git = "https://github.com/jedisct1/rust-hyperloglog.git", rev
= "425487ce910f26636fbde8c4d640b538431aad50" }
id_allocator = { path = "src/components/id_allocator" }
-influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git",
rev = "a905863", package = "iox_query_influxql" }
-influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev =
"a905863", package = "influxdb_influxql_parser" }
-influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev =
"a905863", package = "iox_query" }
-influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev =
"a905863", package = "schema" }
+influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git",
rev = "b9fb3ca", package = "iox_query_influxql" }
+influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev =
"b9fb3ca", package = "influxdb_influxql_parser" }
+influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev =
"b9fb3ca", package = "iox_query" }
+influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev =
"b9fb3ca", package = "schema" }
interpreters = { path = "src/interpreters" }
itertools = "0.10.5"
lz4_flex = { version = "0.11", default-features = false, features = ["frame"] }
@@ -142,7 +142,7 @@ panic_ext = { path = "src/components/panic_ext" }
partitioned_lock = { path = "src/components/partitioned_lock" }
partition_table_engine = { path = "src/partition_table_engine" }
parquet_ext = { path = "src/components/parquet_ext" }
-parquet = { version = "43.0.0" }
+parquet = { version = "49.0.0" }
paste = "1.0"
pin-project-lite = "0.2.8"
pprof = "0.12.1"
@@ -172,9 +172,9 @@ size_ext = { path = "src/components/size_ext" }
smallvec = "1.6"
slog = "2.7"
spin = "0.9.6"
-sqlparser = { version = "0.35", features = ["serde"] }
-system_catalog = { path = "src/system_catalog" }
system_statis = { path = "src/components/system_stats" }
+sqlparser = { version = "0.39.0", features = ["serde"] }
+system_catalog = { path = "src/system_catalog" }
table_engine = { path = "src/table_engine" }
table_kv = { path = "src/components/table_kv" }
tempfile = "3.1.0"
diff --git a/integration_tests/cases/common/dml/issue-1087.result
b/integration_tests/cases/common/dml/issue-1087.result
index d264f4d2..fc1e0d8d 100644
--- a/integration_tests/cases/common/dml/issue-1087.result
+++ b/integration_tests/cases/common/dml/issue-1087.result
@@ -17,6 +17,7 @@ String("logical_plan after inline_table_scan"),String("SAME
TEXT AS ABOVE"),
String("logical_plan after type_coercion"),String("SAME TEXT AS ABOVE"),
String("logical_plan after count_wildcard_rule"),String("SAME TEXT AS ABOVE"),
String("analyzed_logical_plan"),String("SAME TEXT AS ABOVE"),
+String("logical_plan after eliminate_nested_union"),String("SAME TEXT AS
ABOVE"),
String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"),
String("logical_plan after unwrap_cast_in_comparison"),String("SAME TEXT AS
ABOVE"),
String("logical_plan after replace_distinct_aggregate"),String("SAME TEXT AS
ABOVE"),
@@ -33,6 +34,7 @@ String("logical_plan after
eliminate_cross_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after common_sub_expression_eliminate"),String("SAME TEXT
AS ABOVE"),
String("logical_plan after eliminate_limit"),String("SAME TEXT AS ABOVE"),
String("logical_plan after propagate_empty_relation"),String("SAME TEXT AS
ABOVE"),
+String("logical_plan after eliminate_one_union"),String("SAME TEXT AS ABOVE"),
String("logical_plan after filter_null_join_keys"),String("SAME TEXT AS
ABOVE"),
String("logical_plan after eliminate_outer_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"),
@@ -46,6 +48,7 @@ String("logical_plan after
eliminate_projection"),String("TableScan: issue_1087
String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"),
String("logical_plan after influx_regex_to_datafusion_regex"),String("SAME
TEXT AS ABOVE"),
String("logical_plan after handle_gap_fill"),String("SAME TEXT AS ABOVE"),
+String("logical_plan after eliminate_nested_union"),String("SAME TEXT AS
ABOVE"),
String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"),
String("logical_plan after unwrap_cast_in_comparison"),String("SAME TEXT AS
ABOVE"),
String("logical_plan after replace_distinct_aggregate"),String("SAME TEXT AS
ABOVE"),
@@ -62,6 +65,7 @@ String("logical_plan after
eliminate_cross_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after common_sub_expression_eliminate"),String("SAME TEXT
AS ABOVE"),
String("logical_plan after eliminate_limit"),String("SAME TEXT AS ABOVE"),
String("logical_plan after propagate_empty_relation"),String("SAME TEXT AS
ABOVE"),
+String("logical_plan after eliminate_one_union"),String("SAME TEXT AS ABOVE"),
String("logical_plan after filter_null_join_keys"),String("SAME TEXT AS
ABOVE"),
String("logical_plan after eliminate_outer_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"),
@@ -76,17 +80,22 @@ String("logical_plan after push_down_limit"),String("SAME
TEXT AS ABOVE"),
String("logical_plan after influx_regex_to_datafusion_regex"),String("SAME
TEXT AS ABOVE"),
String("logical_plan after handle_gap_fill"),String("SAME TEXT AS ABOVE"),
String("logical_plan"),String("TableScan: issue_1087 projection=[tsid, t,
name, value]"),
-String("initial_physical_plan"),String("ScanTable: table=issue_1087,
parallelism=8, priority=Low\n"),
+String("initial_physical_plan"),String("ScanTable: table=issue_1087,
parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),
+String("initial_physical_plan_with_stats"),String("ScanTable:
table=issue_1087, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), statistics=[Rows=Absent, Bytes=Absent,
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:)]]\n"),
+String("physical_plan after
OutputRequirements"),String("OutputRequirementExec\n ScanTable:
table=issue_1087, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8)\n"),
String("physical_plan after aggregate_statistics"),String("SAME TEXT AS
ABOVE"),
String("physical_plan after join_selection"),String("SAME TEXT AS ABOVE"),
-String("physical_plan after PipelineFixer"),String("SAME TEXT AS ABOVE"),
-String("physical_plan after repartition"),String("SAME TEXT AS ABOVE"),
+String("physical_plan after LimitedDistinctAggregation"),String("SAME TEXT AS
ABOVE"),
String("physical_plan after EnforceDistribution"),String("SAME TEXT AS ABOVE"),
String("physical_plan after CombinePartialFinalAggregate"),String("SAME TEXT
AS ABOVE"),
String("physical_plan after EnforceSorting"),String("SAME TEXT AS ABOVE"),
String("physical_plan after coalesce_batches"),String("SAME TEXT AS ABOVE"),
+String("physical_plan after OutputRequirements"),String("ScanTable:
table=issue_1087, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8)\n"),
String("physical_plan after PipelineChecker"),String("SAME TEXT AS ABOVE"),
-String("physical_plan"),String("ScanTable: table=issue_1087, parallelism=8,
priority=Low\n"),
+String("physical_plan after LimitAggregation"),String("SAME TEXT AS ABOVE"),
+String("physical_plan after ProjectionPushdown"),String("SAME TEXT AS ABOVE"),
+String("physical_plan"),String("ScanTable: table=issue_1087, parallelism=8,
priority=Low, partition_count=UnknownPartitioning(8)\n"),
+String("physical_plan_with_stats"),String("ScanTable: table=issue_1087,
parallelism=8, priority=Low, partition_count=UnknownPartitioning(8),
statistics=[Rows=Absent, Bytes=Absent,
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:)]]\n"),
DROP TABLE `issue_1087`;
diff --git a/integration_tests/cases/common/dml/issue-302.result
b/integration_tests/cases/common/dml/issue-302.result
index b57d881f..cd7afc3a 100644
--- a/integration_tests/cases/common/dml/issue-302.result
+++ b/integration_tests/cases/common/dml/issue-302.result
@@ -12,7 +12,7 @@ affected_rows: 1
select `t`, count(distinct name) from issue302 group by `t`;
-issue302.t,COUNT(DISTINCT issue302.name),
+t,COUNT(DISTINCT issue302.name),
Timestamp(1651737067000),Int64(0),
diff --git a/integration_tests/cases/common/dml/issue-341.result
b/integration_tests/cases/common/dml/issue-341.result
index 90222259..4d7da95c 100644
--- a/integration_tests/cases/common/dml/issue-341.result
+++ b/integration_tests/cases/common/dml/issue-341.result
@@ -58,7 +58,7 @@ WHERE
plan_type,plan,
String("logical_plan"),String("TableScan: issue341_t1 projection=[timestamp,
value], full_filters=[issue341_t1.value = Int32(3)]"),
-String("physical_plan"),String("ScanTable: table=issue341_t1, parallelism=8,
priority=Low\n"),
+String("physical_plan"),String("ScanTable: table=issue341_t1, parallelism=8,
priority=Low, partition_count=UnknownPartitioning(8)\n"),
-- FilterExec node should not be in plan.
@@ -71,8 +71,8 @@ WHERE
tag1 = "t3";
plan_type,plan,
-String("logical_plan"),String("Projection: issue341_t1.timestamp,
issue341_t1.value\n TableScan: issue341_t1 projection=[timestamp, value,
tag1], full_filters=[issue341_t1.tag1 = Utf8(\"t3\")]"),
-String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as
timestamp, value@1 as value]\n ScanTable: table=issue341_t1, parallelism=8,
priority=Low\n"),
+String("logical_plan"),String("TableScan: issue341_t1 projection=[timestamp,
value], full_filters=[issue341_t1.tag1 = Utf8(\"t3\")]"),
+String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as
timestamp, value@1 as value]\n ScanTable: table=issue341_t1, parallelism=8,
priority=Low, partition_count=UnknownPartitioning(8)\n"),
-- Repeat operations above, but with overwrite table
@@ -116,7 +116,7 @@ WHERE
plan_type,plan,
String("logical_plan"),String("Filter: issue341_t2.value = Float64(3)\n
TableScan: issue341_t2 projection=[timestamp, value],
partial_filters=[issue341_t2.value = Float64(3)]"),
-String("physical_plan"),String("CoalesceBatchesExec: target_batch_size=8192\n
FilterExec: value@1 = 3\n ScanTable: table=issue341_t2, parallelism=8,
priority=Low\n"),
+String("physical_plan"),String("CoalesceBatchesExec: target_batch_size=8192\n
FilterExec: value@1 = 3\n ScanTable: table=issue341_t2, parallelism=8,
priority=Low, partition_count=UnknownPartitioning(8)\n"),
-- When using tag as filter, FilterExec node should not be in plan.
@@ -129,8 +129,8 @@ WHERE
tag1 = "t3";
plan_type,plan,
-String("logical_plan"),String("Projection: issue341_t2.timestamp,
issue341_t2.value\n TableScan: issue341_t2 projection=[timestamp, value,
tag1], full_filters=[issue341_t2.tag1 = Utf8(\"t3\")]"),
-String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as
timestamp, value@1 as value]\n ScanTable: table=issue341_t2, parallelism=8,
priority=Low\n"),
+String("logical_plan"),String("TableScan: issue341_t2 projection=[timestamp,
value], full_filters=[issue341_t2.tag1 = Utf8(\"t3\")]"),
+String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as
timestamp, value@1 as value]\n ScanTable: table=issue341_t2, parallelism=8,
priority=Low, partition_count=UnknownPartitioning(8)\n"),
DROP TABLE IF EXISTS `issue341_t1`;
diff --git a/integration_tests/cases/common/dml/issue-59.result
b/integration_tests/cases/common/dml/issue-59.result
index 549c7019..4f7544c8 100644
--- a/integration_tests/cases/common/dml/issue-59.result
+++ b/integration_tests/cases/common/dml/issue-59.result
@@ -24,8 +24,8 @@ FROM issue59
GROUP BY id+1;
plan_type,plan,
-String("logical_plan"),String("Projection: group_alias_0 AS issue59.id +
Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate:
groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Projection:
group_alias_0, alias1\n Aggregate: groupBy=[[CAST(issue59.id AS Int64) +
Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n
TableScan: issue59 projection=[id, account]"),
-String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as
issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n
AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0],
aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n
RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n
AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0],
aggr=[COUNT(alias1)]\n [...]
+String("logical_plan"),String("Projection: group_alias_0 AS issue59.id +
Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate:
groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Aggregate:
groupBy=[[CAST(issue59.id AS Int64) + Int64(1) AS group_alias_0,
issue59.account AS alias1]], aggr=[[]]\n TableScan: issue59
projection=[id, account]"),
+String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as
issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n
AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0],
aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n
RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n
AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0],
aggr=[COUNT(alias1)]\n [...]
DROP TABLE IF EXISTS issue59;
diff --git a/integration_tests/cases/common/explain/explain.result
b/integration_tests/cases/common/explain/explain.result
index 0cd06380..6cf09c07 100644
--- a/integration_tests/cases/common/explain/explain.result
+++ b/integration_tests/cases/common/explain/explain.result
@@ -10,7 +10,7 @@ EXPLAIN SELECT t FROM `04_explain_t`;
plan_type,plan,
String("logical_plan"),String("TableScan: 04_explain_t projection=[t]"),
-String("physical_plan"),String("ScanTable: table=04_explain_t, parallelism=8,
priority=Low\n"),
+String("physical_plan"),String("ScanTable: table=04_explain_t, parallelism=8,
priority=Low, partition_count=UnknownPartitioning(8)\n"),
DROP TABLE `04_explain_t`;
diff --git a/integration_tests/cases/common/function/aggregate.result
b/integration_tests/cases/common/function/aggregate.result
index 037e503a..f45a6841 100644
--- a/integration_tests/cases/common/function/aggregate.result
+++ b/integration_tests/cases/common/function/aggregate.result
@@ -105,7 +105,50 @@ COUNT(DISTINCT 02_function_aggregate_table1.arch),
Int64(2),
+CREATE TABLE `02_function_aggregate_table2` (
+ `timestamp` timestamp NOT NULL,
+ `arch` string TAG,
+ `datacenter` string TAG,
+ `value` int,
+ `uvalue` uint64,
+ timestamp KEY (timestamp)) ENGINE=Analytic
+WITH(
+ enable_ttl='false',
+ update_mode = 'append'
+);
+
+affected_rows: 0
+
+INSERT INTO `02_function_aggregate_table2`
+ (`timestamp`, `arch`, `datacenter`, `value`, `uvalue`)
+VALUES
+ (1658304762, 'x86-64', 'china', 100, 10),
+ (1658304763, 'x86-64', 'china', 200, 10),
+ (1658304762, 'arm64', 'china', 110, 0),
+ (1658304763, 'arm64', 'china', 210, 0);
+
+affected_rows: 4
+
+-- The should select empty column
+SELECT count(*) FROM `02_function_aggregate_table1`;
+
+COUNT(*),
+Int64(4),
+
+
+-- Same with before, but query from sst
+-- SQLNESS ARG pre_cmd=flush
+SELECT count(*) FROM `02_function_aggregate_table1`;
+
+COUNT(*),
+Int64(4),
+
+
DROP TABLE `02_function_aggregate_table1`;
affected_rows: 0
+DROP TABLE `02_function_aggregate_table2`;
+
+affected_rows: 0
+
diff --git a/integration_tests/cases/common/function/aggregate.sql
b/integration_tests/cases/common/function/aggregate.sql
index c4f8dd50..8543245a 100644
--- a/integration_tests/cases/common/function/aggregate.sql
+++ b/integration_tests/cases/common/function/aggregate.sql
@@ -57,4 +57,32 @@ SELECT distinct(`arch`) FROM `02_function_aggregate_table1`
ORDER BY `arch` DESC
SELECT count(distinct(`arch`)) FROM `02_function_aggregate_table1`;
+CREATE TABLE `02_function_aggregate_table2` (
+ `timestamp` timestamp NOT NULL,
+ `arch` string TAG,
+ `datacenter` string TAG,
+ `value` int,
+ `uvalue` uint64,
+ timestamp KEY (timestamp)) ENGINE=Analytic
+WITH(
+ enable_ttl='false',
+ update_mode = 'append'
+);
+
+INSERT INTO `02_function_aggregate_table2`
+ (`timestamp`, `arch`, `datacenter`, `value`, `uvalue`)
+VALUES
+ (1658304762, 'x86-64', 'china', 100, 10),
+ (1658304763, 'x86-64', 'china', 200, 10),
+ (1658304762, 'arm64', 'china', 110, 0),
+ (1658304763, 'arm64', 'china', 210, 0);
+
+-- The should select empty column
+SELECT count(*) FROM `02_function_aggregate_table1`;
+
+-- Same with before, but query from sst
+-- SQLNESS ARG pre_cmd=flush
+SELECT count(*) FROM `02_function_aggregate_table1`;
+
DROP TABLE `02_function_aggregate_table1`;
+DROP TABLE `02_function_aggregate_table2`;
diff --git a/integration_tests/cases/common/optimizer/optimizer.result
b/integration_tests/cases/common/optimizer/optimizer.result
index f9cfac2d..5df9f47e 100644
--- a/integration_tests/cases/common/optimizer/optimizer.result
+++ b/integration_tests/cases/common/optimizer/optimizer.result
@@ -10,7 +10,7 @@ EXPLAIN SELECT max(value) AS c1, avg(value) AS c2 FROM
`07_optimizer_t` GROUP BY
plan_type,plan,
String("logical_plan"),String("Projection: MAX(07_optimizer_t.value) AS c1,
AVG(07_optimizer_t.value) AS c2\n Aggregate: groupBy=[[07_optimizer_t.name]],
aggr=[[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]]\n TableScan:
07_optimizer_t projection=[name, value]"),
-String("physical_plan"),String("ProjectionExec:
expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n
AggregateExec: mode=FinalPartitioned, gby=[name@0 as name],
aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n
CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec:
partitioning=Hash([name@0], 8), input_partitions=8\n AggregateExec:
mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value),
AVG(07_optimizer_t.value)]\n [...]
+String("physical_plan"),String("ProjectionExec:
expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n
AggregateExec: mode=FinalPartitioned, gby=[name@0 as name],
aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n
CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec:
partitioning=Hash([name@0], 8), input_partitions=8\n AggregateExec:
mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value),
AVG(07_optimizer_t.value)]\n [...]
DROP TABLE `07_optimizer_t`;
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result
b/integration_tests/cases/env/cluster/ddl/partition_table.result
index d376718c..233c3483 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.result
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.result
@@ -80,19 +80,23 @@
UInt64(16367588166920223437),Timestamp(1651737067000),String("horaedb9"),Int32(0
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
+-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";
plan_type,plan,
-String("Plan with Metrics"),String("ResolvedPartitionedScan:
pushdown_continue:false, partition_count:1, metrics=[\npartition_table_t:\n
__partition_table_t_1:\n poll_duration=xxs\n total_duration=xxs\n
wait_duration=xxs\n\n__partition_table_t_1:\nCoalescePartitionsExec,
metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable:
table=__partition_table_t_1, parallelism=8, priority=Low, metrics=[\nPredicate
{ exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange [...]
+String("Plan with Metrics"),String("ResolvedPartitionedScan:
pushdown_continue:false, partition_count:1, metrics=xx\n ScanTable:
table=__partition_table_t_1, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name =
Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start:
Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807)
} }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n
in [...]
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
-- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x
+-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0",
"ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");
plan_type,plan,
-String("Plan with Metrics"),String("ResolvedPartitionedScan:
pushdown_continue:false, partition_count:3, metrics=[\npartition_table_t:\n
__partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n
wait_duration=xxs\n __partition_table_t_x:\n
poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n
__partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n
wait_duration=xxs\n\n__partition_table_t_x:\n [...]
+String("Plan with Metrics"),String("ResolvedPartitionedScan:
pushdown_continue:false, partition_count:3, metrics=xx\n ScanTable:
table=__partition_table_t_x, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable:
table=__partition_table_t_x, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable:
table=__partition_table_t_x, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), metrics=[\nPredica [...]
ALTER TABLE partition_table_t ADD COLUMN (b string);
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.sql
b/integration_tests/cases/env/cluster/ddl/partition_table.sql
index a36b59ac..a87dfbb2 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.sql
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.sql
@@ -37,11 +37,15 @@ SELECT * from partition_table_t where name in ("horaedb5",
"horaedb6", "horaedb7
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
+-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
-- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x
+-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0",
"ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");
ALTER TABLE partition_table_t ADD COLUMN (b string);
diff --git a/integration_tests/cases/env/local/ddl/query-plan.result
b/integration_tests/cases/env/local/ddl/query-plan.result
index a421856b..1f632184 100644
--- a/integration_tests/cases/env/local/ddl/query-plan.result
+++ b/integration_tests/cases/env/local/ddl/query-plan.result
@@ -27,48 +27,53 @@ affected_rows: 3
-- This query should include memtable
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
explain analyze select t from `03_dml_select_real_time_range`
where t > 1695348001000;
plan_type,plan,
-String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=Low,
metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348001000, None)],
time_range:TimeRange { inclusive_start: Timestamp(1695348001001),
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n
do_merge_sort=true\n iter_num=1\n merge_iter_0:\n
init_duration=xxs\n num_memtables=1\n num_ssts=0\n
scan_count=2\n scan_durat [...]
+String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >
TimestampMillisecond(1695348001000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348001001), exclusive_end:
Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n
iter_num=1\n merge_iter_0:\n init_duration=xxs\n
num_memtables=1\n num_ssts=0\n [...]
-- This query should have higher priority
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
explain analyze select t from `03_dml_select_real_time_range`
where t >= 1695348001000 and t < 1695348002000;
plan_type,plan,
-String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=High,
metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), t
< TimestampMillisecond(1695348002000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348001000), exclusive_end:
Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=true\n
iter_num=1\n merge_iter_0:\n init_duration=xxs\n
num_memtables=1\n num_ssts= [...]
+String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=High,
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >=
TimestampMillisecond(1695348001000, None), t <
TimestampMillisecond(1695348002000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348001000), exclusive_end:
Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=true\n
iter_num=1\n merge_iter_0:\n init_duration=xxs\n [...]
-- This query should not include memtable
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
explain analyze select t from `03_dml_select_real_time_range`
where t > 1695348002000;
plan_type,plan,
-String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=Low,
metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348002000, None)],
time_range:TimeRange { inclusive_start: Timestamp(1695348002001),
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n
do_merge_sort=true\n iter_num=0\n=0]\n"),
+String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >
TimestampMillisecond(1695348002000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348002001), exclusive_end:
Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n
iter_num=0\n=0]\n"),
-- SQLNESS ARG pre_cmd=flush
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
-- SQLNESS REPLACE project_record_batch=\d+.?\d*(µ|m|n) project_record_batch=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
-- This query should include SST
explain analyze select t from `03_dml_select_real_time_range`
where t > 1695348001000;
plan_type,plan,
-String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=Low,
metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348001000, None)],
time_range:TimeRange { inclusive_start: Timestamp(1695348001001),
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n
do_merge_sort=true\n iter_num=1\n merge_iter_0:\n
init_duration=xxs\n num_memtables=0\n num_ssts=1\n
scan_count=2\n scan_durat [...]
+String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >
TimestampMillisecond(1695348001000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348001001), exclusive_end:
Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n
iter_num=1\n merge_iter_0:\n init_duration=xxs\n
num_memtables=0\n num_ssts=1\n [...]
-- This query should not include SST
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
explain analyze select t from `03_dml_select_real_time_range`
where t > 1695348002000;
plan_type,plan,
-String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=Low,
metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348002000, None)],
time_range:TimeRange { inclusive_start: Timestamp(1695348002001),
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n
do_merge_sort=true\n iter_num=0\n=0]\n"),
+String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >
TimestampMillisecond(1695348002000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348002001), exclusive_end:
Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n
iter_num=0\n=0]\n"),
-- Table with an 'append' update mode
@@ -97,11 +102,12 @@ affected_rows: 3
-- SQLNESS REPLACE since_create=\d+.?\d*(µ|m|n) since_create=xx
-- SQLNESS REPLACE since_init=\d+.?\d*(µ|m|n) since_init=xx
-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
explain analyze select t from `03_append_mode_table`
where t >= 1695348001000 and name = 'ceresdb';
plan_type,plan,
-String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t],
metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable:
table=03_append_mode_table, parallelism=8, priority=Low, metrics=[\nPredicate {
exprs:[t >= TimestampMillisecond(1695348001000, None), name =
Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start:
Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) }
}\nscan_table:\n do_merge_sort=false\n chain_iter_0:\n
num_memtables=1\n [...]
+String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t],
metrics=xx\n ScanTable: table=03_append_mode_table, parallelism=8,
priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate {
exprs:[t >= TimestampMillisecond(1695348001000, None), name =
Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start:
Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) }
}\nscan_table:\n do_merge_sort=false\n chain_iter_0:\n
num_memtables= [...]
-- Should just fetch projected columns from SST
@@ -111,11 +117,12 @@ String("Plan with Metrics"),String("ProjectionExec:
expr=[t@0 as t], metrics=[ou
-- SQLNESS REPLACE since_init=\d+.?\d*(µ|m|n) since_init=xx
-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
-- SQLNESS REPLACE project_record_batch=\d+.?\d*(µ|m|n) project_record_batch=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
explain analyze select t from `03_append_mode_table`
where t >= 1695348001000 and name = 'ceresdb';
plan_type,plan,
-String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t],
metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable:
table=03_append_mode_table, parallelism=8, priority=Low, metrics=[\nPredicate {
exprs:[t >= TimestampMillisecond(1695348001000, None), name =
Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start:
Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) }
}\nscan_table:\n do_merge_sort=false\n chain_iter_0:\n
num_memtables=0\n [...]
+String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t],
metrics=xx\n ScanTable: table=03_append_mode_table, parallelism=8,
priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate {
exprs:[t >= TimestampMillisecond(1695348001000, None), name =
Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start:
Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) }
}\nscan_table:\n do_merge_sort=false\n chain_iter_0:\n
num_memtables= [...]
CREATE TABLE `TEST_QUERY_PRIORITY` (
@@ -132,20 +139,22 @@ affected_rows: 0
-- This query should have higher priority
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
explain analyze select TS from `TEST_QUERY_PRIORITY`
where TS >= 1695348001000 and TS < 1695348002000;
plan_type,plan,
-String("Plan with Metrics"),String("ScanTable: table=TEST_QUERY_PRIORITY,
parallelism=8, priority=High, metrics=[\nPredicate { exprs:[TS >=
TimestampMillisecond(1695348001000, None), TS <
TimestampMillisecond(1695348002000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348001000), exclusive_end:
Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=false\n=0]\n"),
+String("Plan with Metrics"),String("ScanTable: table=TEST_QUERY_PRIORITY,
parallelism=8, priority=High, partition_count=UnknownPartitioning(8),
metrics=[\nPredicate { exprs:[TS >= TimestampMillisecond(1695348001000, None),
TS < TimestampMillisecond(1695348002000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348001000), exclusive_end:
Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=false\n=0]\n"),
-- This query should have higher priority
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
explain analyze select TS from `TEST_QUERY_PRIORITY`
where TS >= 1695348001000;
plan_type,plan,
-String("Plan with Metrics"),String("ScanTable: table=TEST_QUERY_PRIORITY,
parallelism=8, priority=Low, metrics=[\nPredicate { exprs:[TS >=
TimestampMillisecond(1695348001000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348001000), exclusive_end:
Timestamp(9223372036854775807) } }\nscan_table:\n
do_merge_sort=false\n=0]\n"),
+String("Plan with Metrics"),String("ScanTable: table=TEST_QUERY_PRIORITY,
parallelism=8, priority=Low, partition_count=UnknownPartitioning(8),
metrics=[\nPredicate { exprs:[TS >= TimestampMillisecond(1695348001000, None)],
time_range:TimeRange { inclusive_start: Timestamp(1695348001000),
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n
do_merge_sort=false\n=0]\n"),
DROP TABLE `03_dml_select_real_time_range`;
diff --git a/integration_tests/cases/env/local/ddl/query-plan.sql
b/integration_tests/cases/env/local/ddl/query-plan.sql
index 218e0f7b..5217b1a0 100644
--- a/integration_tests/cases/env/local/ddl/query-plan.sql
+++ b/integration_tests/cases/env/local/ddl/query-plan.sql
@@ -18,27 +18,32 @@ INSERT INTO `03_dml_select_real_time_range` (t, name, value)
-- This query should include memtable
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
explain analyze select t from `03_dml_select_real_time_range`
where t > 1695348001000;
-- This query should have higher priority
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
explain analyze select t from `03_dml_select_real_time_range`
where t >= 1695348001000 and t < 1695348002000;
-- This query should not include memtable
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
explain analyze select t from `03_dml_select_real_time_range`
where t > 1695348002000;
-- SQLNESS ARG pre_cmd=flush
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
-- SQLNESS REPLACE project_record_batch=\d+.?\d*(µ|m|n) project_record_batch=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
-- This query should include SST
explain analyze select t from `03_dml_select_real_time_range`
where t > 1695348001000;
-- This query should not include SST
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
explain analyze select t from `03_dml_select_real_time_range`
where t > 1695348002000;
@@ -64,6 +69,7 @@ INSERT INTO `03_append_mode_table` (t, name, value)
-- SQLNESS REPLACE since_create=\d+.?\d*(µ|m|n) since_create=xx
-- SQLNESS REPLACE since_init=\d+.?\d*(µ|m|n) since_init=xx
-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
explain analyze select t from `03_append_mode_table`
where t >= 1695348001000 and name = 'ceresdb';
@@ -74,6 +80,7 @@ where t >= 1695348001000 and name = 'ceresdb';
-- SQLNESS REPLACE since_init=\d+.?\d*(µ|m|n) since_init=xx
-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
-- SQLNESS REPLACE project_record_batch=\d+.?\d*(µ|m|n) project_record_batch=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
explain analyze select t from `03_append_mode_table`
where t >= 1695348001000 and name = 'ceresdb';
@@ -89,11 +96,13 @@ CREATE TABLE `TEST_QUERY_PRIORITY` (
-- This query should have higher priority
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
explain analyze select TS from `TEST_QUERY_PRIORITY`
where TS >= 1695348001000 and TS < 1695348002000;
-- This query should have higher priority
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
explain analyze select TS from `TEST_QUERY_PRIORITY`
where TS >= 1695348001000;
diff --git a/src/analytic_engine/src/instance/reorder_memtable.rs
b/src/analytic_engine/src/instance/reorder_memtable.rs
index e6eab4d1..c37417bf 100644
--- a/src/analytic_engine/src/instance/reorder_memtable.rs
+++ b/src/analytic_engine/src/instance/reorder_memtable.rs
@@ -147,8 +147,11 @@ impl ExecutionPlan for ScanMemIter {
}))
}
- fn statistics(&self) -> Statistics {
- Statistics::default()
+ fn statistics(
+ &self,
+ ) -> std::result::Result<datafusion::common::Statistics,
datafusion::error::DataFusionError>
+ {
+ Ok(Statistics::new_unknown(&self.schema()))
}
}
@@ -259,8 +262,8 @@ impl Reorder {
pub async fn into_stream(self) ->
Result<SendableFetchingRecordBatchStream> {
// 1. Init datafusion context
let runtime = Arc::new(RuntimeEnv::default());
- let state = SessionState::with_config_rt(SessionConfig::new(),
runtime);
- let ctx = SessionContext::with_state(state);
+ let state = SessionState::new_with_config_rt(SessionConfig::new(),
runtime);
+ let ctx = SessionContext::new_with_state(state);
let table_provider = Arc::new(MemIterProvider {
arrow_schema: self.schema.to_arrow_schema_ref(),
iter: Mutex::new(Some(self.iter)),
diff --git a/src/analytic_engine/src/memtable/skiplist/iter.rs
b/src/analytic_engine/src/memtable/skiplist/iter.rs
index 4787b754..cce3913d 100644
--- a/src/analytic_engine/src/memtable/skiplist/iter.rs
+++ b/src/analytic_engine/src/memtable/skiplist/iter.rs
@@ -154,6 +154,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
ColumnarIterImpl<A> {
assert!(self.batch_size > 0);
let record_schema = self.row_projector.fetched_schema().clone();
+ let is_empty_projection = record_schema.columns().is_empty();
let primary_key_indexes = self
.row_projector
.primary_key_indexes()
@@ -183,6 +184,10 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
ColumnarIterImpl<A> {
}
}
+ if is_empty_projection {
+ builder.inc_row_num(num_rows);
+ }
+
if num_rows > 0 {
if let Some(deadline) = self.deadline {
let now = Instant::now();
diff --git a/src/analytic_engine/src/row_iter/record_batch_stream.rs
b/src/analytic_engine/src/row_iter/record_batch_stream.rs
index 2a39c648..49c41f24 100644
--- a/src/analytic_engine/src/row_iter/record_batch_stream.rs
+++ b/src/analytic_engine/src/row_iter/record_batch_stream.rs
@@ -161,6 +161,7 @@ fn filter_record_batch(
let filter_array = predicate
.evaluate(record_batch)
.map(|v| v.into_array(record_batch.num_rows()))
+ .context(FilterExec)?
.context(FilterExec)?;
let selected_rows = filter_array
.as_any()
diff --git a/src/analytic_engine/src/table/mod.rs
b/src/analytic_engine/src/table/mod.rs
index af381b5b..674f6b3b 100644
--- a/src/analytic_engine/src/table/mod.rs
+++ b/src/analytic_engine/src/table/mod.rs
@@ -430,6 +430,7 @@ pub fn support_pushdown(schema: &Schema, need_dedup: bool,
col_names: &[String])
}
// When table need dedup, only unique keys columns support pushdown
+ // See https://github.com/apache/incubator-horaedb/issues/605
col_names
.iter()
.all(|col_name| schema.is_unique_column(col_name.as_str()))
diff --git a/src/common_types/src/datum.rs b/src/common_types/src/datum.rs
index d152e960..9b22439a 100644
--- a/src/common_types/src/datum.rs
+++ b/src/common_types/src/datum.rs
@@ -292,9 +292,11 @@ impl TryFrom<&SqlDataType> for DatumKind {
SqlDataType::Double => Ok(Self::Double),
SqlDataType::Boolean => Ok(Self::Boolean),
SqlDataType::BigInt(_) => Ok(Self::Int64),
+ SqlDataType::Int64 => Ok(Self::Int64),
SqlDataType::Int(_) => Ok(Self::Int32),
+ SqlDataType::Int8(_) => Ok(Self::Int8),
SqlDataType::SmallInt(_) => Ok(Self::Int16),
- SqlDataType::String => Ok(Self::String),
+ SqlDataType::String(_) => Ok(Self::String),
SqlDataType::Varbinary(_) => Ok(Self::Varbinary),
SqlDataType::Date => Ok(Self::Date),
SqlDataType::Time(_, _) => Ok(Self::Time),
@@ -1453,7 +1455,7 @@ impl Datum {
ScalarValue::Date32(v) => v.map(Datum::Date),
ScalarValue::Time64Nanosecond(v) => v.map(Datum::Time),
ScalarValue::Dictionary(_, literal) =>
Datum::from_scalar_value(literal),
- ScalarValue::List(_, _)
+ ScalarValue::List(_)
| ScalarValue::Date64(_)
| ScalarValue::Time32Second(_)
| ScalarValue::Time32Millisecond(_)
@@ -1467,10 +1469,12 @@ impl Datum {
| ScalarValue::Decimal128(_, _, _)
| ScalarValue::Null
| ScalarValue::IntervalMonthDayNano(_)
- | ScalarValue::Fixedsizelist(_, _, _)
+ | ScalarValue::FixedSizeList(_)
| ScalarValue::DurationSecond(_)
| ScalarValue::DurationMillisecond(_)
| ScalarValue::DurationMicrosecond(_)
+ | ScalarValue::Decimal256(_, _, _)
+ | ScalarValue::LargeList(_)
| ScalarValue::DurationNanosecond(_) => None,
}
}
@@ -1502,7 +1506,7 @@ impl<'a> DatumView<'a> {
v.map(|v| DatumView::Timestamp(Timestamp::new(v)))
}
ScalarValue::Dictionary(_, literal) =>
DatumView::from_scalar_value(literal),
- ScalarValue::List(_, _)
+ ScalarValue::List(_)
| ScalarValue::Date64(_)
| ScalarValue::Time32Second(_)
| ScalarValue::Time32Millisecond(_)
@@ -1516,10 +1520,12 @@ impl<'a> DatumView<'a> {
| ScalarValue::Decimal128(_, _, _)
| ScalarValue::Null
| ScalarValue::IntervalMonthDayNano(_)
- | ScalarValue::Fixedsizelist(_, _, _)
+ | ScalarValue::FixedSizeList(_)
| ScalarValue::DurationSecond(_)
| ScalarValue::DurationMillisecond(_)
| ScalarValue::DurationMicrosecond(_)
+ | ScalarValue::Decimal256(_, _, _)
+ | ScalarValue::LargeList(_)
| ScalarValue::DurationNanosecond(_) => None,
}
}
diff --git a/src/common_types/src/projected_schema.rs
b/src/common_types/src/projected_schema.rs
index 30e9eb01..1eff7dc4 100644
--- a/src/common_types/src/projected_schema.rs
+++ b/src/common_types/src/projected_schema.rs
@@ -105,7 +105,7 @@ pub struct RowProjector {
/// For example:
/// source columns in sst: 0,1,2,3,4
/// target projection columns: 2,1,3
- ///
+ ///
/// the actual columns in fetched record: 1,2,3
/// relative columns indexes in fetched record: 0,1,2
///
@@ -347,6 +347,10 @@ impl ProjectedSchema {
pub fn table_schema(&self) -> &Schema {
&self.0.table_schema
}
+
+ pub fn target_column_schema(&self, i: usize) -> &ColumnSchema {
+ self.0.target_record_schema.column(i)
+ }
}
impl From<ProjectedSchema> for horaedbproto::schema::ProjectedSchema {
diff --git a/src/common_types/src/record_batch.rs
b/src/common_types/src/record_batch.rs
index 2a543ca5..0278aa70 100644
--- a/src/common_types/src/record_batch.rs
+++ b/src/common_types/src/record_batch.rs
@@ -24,7 +24,7 @@ use arrow::{
compute,
datatypes::{DataType, Field, Schema, SchemaRef as ArrowSchemaRef,
TimeUnit},
error::ArrowError,
- record_batch::RecordBatch as ArrowRecordBatch,
+ record_batch::{RecordBatch as ArrowRecordBatch, RecordBatchOptions},
};
use arrow_ext::operation;
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
@@ -124,14 +124,18 @@ pub struct RecordBatchData {
}
impl RecordBatchData {
- fn new(arrow_schema: ArrowSchemaRef, column_blocks: Vec<ColumnBlock>) ->
Result<Self> {
+ fn new(
+ arrow_schema: ArrowSchemaRef,
+ column_blocks: Vec<ColumnBlock>,
+ options: RecordBatchOptions,
+ ) -> Result<Self> {
let arrays = column_blocks
.iter()
.map(|column| column.to_arrow_array_ref())
- .collect();
-
+ .collect::<Vec<_>>();
let arrow_record_batch =
- ArrowRecordBatch::try_new(arrow_schema,
arrays).context(CreateArrow)?;
+ ArrowRecordBatch::try_new_with_options(arrow_schema, arrays,
&options)
+ .context(CreateArrow)?;
Ok(RecordBatchData {
arrow_record_batch,
@@ -140,10 +144,7 @@ impl RecordBatchData {
}
fn num_rows(&self) -> usize {
- self.column_blocks
- .first()
- .map(|column| column.num_rows())
- .unwrap_or(0)
+ self.arrow_record_batch.num_rows()
}
fn take_column_block(&mut self, index: usize) -> ColumnBlock {
@@ -227,9 +228,13 @@ impl RecordBatch {
}
}
- pub fn new(schema: RecordSchema, column_blocks: Vec<ColumnBlock>) ->
Result<Self> {
+ pub fn new(
+ schema: RecordSchema,
+ column_blocks: Vec<ColumnBlock>,
+ num_rows: usize,
+ ) -> Result<Self> {
ensure!(schema.num_columns() == column_blocks.len(), SchemaLen);
-
+ let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
// Validate schema and column_blocks.
for (column_schema, column_block) in
schema.columns().iter().zip(column_blocks.iter()) {
ensure!(
@@ -243,7 +248,7 @@ impl RecordBatch {
}
let arrow_schema = schema.to_arrow_schema_ref();
- let data = RecordBatchData::new(arrow_schema, column_blocks)?;
+ let data = RecordBatchData::new(arrow_schema, column_blocks, options)?;
Ok(Self { schema, data })
}
@@ -388,6 +393,7 @@ impl FetchedRecordBatch {
let mut column_blocks =
Vec::with_capacity(fetched_schema.num_columns());
let num_rows = arrow_record_batch.num_rows();
let num_columns = arrow_record_batch.num_columns();
+ let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
for (col_idx_opt, col_schema) in
column_indexes.iter().zip(fetched_schema.columns()) {
match col_idx_opt {
Some(col_idx) => {
@@ -419,7 +425,8 @@ impl FetchedRecordBatch {
}
}
- let data = RecordBatchData::new(fetched_schema.to_arrow_schema_ref(),
column_blocks)?;
+ let data =
+ RecordBatchData::new(fetched_schema.to_arrow_schema_ref(),
column_blocks, options)?;
Ok(FetchedRecordBatch {
schema: fetched_schema,
@@ -471,6 +478,8 @@ impl FetchedRecordBatch {
// Get the schema after projection.
let record_schema = projected_schema.to_record_schema();
let mut column_blocks =
Vec::with_capacity(record_schema.num_columns());
+ let num_rows = self.data.num_rows();
+ let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
for column_schema in record_schema.columns() {
let column_index =
@@ -485,8 +494,8 @@ impl FetchedRecordBatch {
column_blocks.push(column_block);
}
- let data = RecordBatchData::new(record_schema.to_arrow_schema_ref(),
column_blocks)?;
-
+ let data =
+ RecordBatchData::new(record_schema.to_arrow_schema_ref(),
column_blocks, options)?;
Ok(RecordBatch {
schema: record_schema,
data,
@@ -582,6 +591,7 @@ pub struct FetchedRecordBatchBuilder {
fetched_schema: RecordSchema,
primary_key_indexes: Option<Vec<usize>>,
builders: Vec<ColumnBlockBuilder>,
+ num_rows: usize,
}
impl FetchedRecordBatchBuilder {
@@ -601,6 +611,7 @@ impl FetchedRecordBatchBuilder {
fetched_schema,
primary_key_indexes,
builders,
+ num_rows: 0,
}
}
@@ -624,6 +635,7 @@ impl FetchedRecordBatchBuilder {
fetched_schema: record_schema,
primary_key_indexes,
builders,
+ num_rows: 0,
}
}
@@ -671,6 +683,13 @@ impl FetchedRecordBatchBuilder {
Ok(())
}
+ /// When the record batch contains no column, its row num may not be 0, so
+ /// we need to inc row num explicitly in this case.
+ /// See: https://github.com/apache/arrow-datafusion/pull/7920
+ pub fn inc_row_num(&mut self, n: usize) {
+ self.num_rows += n;
+ }
+
/// Append `len` from `start` (inclusive) to this builder.
///
/// REQUIRE:
@@ -702,7 +721,7 @@ impl FetchedRecordBatchBuilder {
self.builders
.first()
.map(|builder| builder.len())
- .unwrap_or(0)
+ .unwrap_or(self.num_rows)
}
/// Returns true if the builder is empty.
@@ -725,11 +744,16 @@ impl FetchedRecordBatchBuilder {
.map(|builder| builder.build())
.collect();
let arrow_schema = self.fetched_schema.to_arrow_schema_ref();
+ let num_rows = column_blocks
+ .first()
+ .map(|block| block.num_rows())
+ .unwrap_or(self.num_rows);
+ let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
Ok(FetchedRecordBatch {
schema: self.fetched_schema.clone(),
primary_key_indexes: self.primary_key_indexes.clone(),
- data: RecordBatchData::new(arrow_schema, column_blocks)?,
+ data: RecordBatchData::new(arrow_schema, column_blocks, options)?,
})
}
}
diff --git a/src/components/parquet_ext/src/meta_data.rs
b/src/components/parquet_ext/src/meta_data.rs
index 00a0bb3a..ad18a36c 100644
--- a/src/components/parquet_ext/src/meta_data.rs
+++ b/src/components/parquet_ext/src/meta_data.rs
@@ -19,9 +19,10 @@ use std::{ops::Range, sync::Arc};
use async_trait::async_trait;
use bytes::Bytes;
+use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
use generic_error::GenericResult;
use parquet::{
- arrow::{arrow_reader::ArrowReaderOptions, ParquetRecordBatchStreamBuilder},
+ arrow::arrow_reader::ArrowReaderOptions,
errors::{ParquetError, Result},
file::{footer, metadata::ParquetMetaData},
};
diff --git a/src/components/parquet_ext/src/prune/min_max.rs
b/src/components/parquet_ext/src/prune/min_max.rs
index 8ea39299..0a717021 100644
--- a/src/components/parquet_ext/src/prune/min_max.rs
+++ b/src/components/parquet_ext/src/prune/min_max.rs
@@ -230,7 +230,7 @@ mod test {
}
fn prepare_parquet_schema_descr(schema: &ArrowSchema) -> SchemaDescPtr {
- let mut fields = schema
+ let fields = schema
.fields()
.iter()
.map(|field| {
@@ -245,7 +245,7 @@ mod test {
})
.collect();
let schema = SchemaType::group_type_builder("schema")
- .with_fields(&mut fields)
+ .with_fields(fields)
.build()
.unwrap();
diff --git a/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs
b/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs
index feba491f..dd430f52 100644
--- a/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs
+++ b/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs
@@ -129,8 +129,10 @@ impl ExecutionPlan for UnresolvedPartitionedScan {
))
}
- fn statistics(&self) -> Statistics {
- Statistics::default()
+ fn statistics(
+ &self,
+ ) -> Result<datafusion::common::Statistics,
datafusion::error::DataFusionError> {
+ Ok(Statistics::new_unknown(&self.schema()))
}
}
@@ -367,8 +369,10 @@ impl ExecutionPlan for ResolvedPartitionedScan {
Ok(Box::pin(record_stream))
}
- fn statistics(&self) -> Statistics {
- Statistics::default()
+ fn statistics(
+ &self,
+ ) -> Result<datafusion::common::Statistics,
datafusion::error::DataFusionError> {
+ Ok(Statistics::new_unknown(&self.schema()))
}
fn metrics(&self) -> Option<MetricsSet> {
@@ -578,8 +582,10 @@ impl ExecutionPlan for UnresolvedSubTableScan {
))
}
- fn statistics(&self) -> Statistics {
- Statistics::default()
+ fn statistics(
+ &self,
+ ) -> Result<datafusion::common::Statistics,
datafusion::error::DataFusionError> {
+ Ok(Statistics::new_unknown(&self.schema()))
}
}
diff --git a/src/df_engine_extensions/src/dist_sql_query/test_util.rs
b/src/df_engine_extensions/src/dist_sql_query/test_util.rs
index 1f4e788f..c42f9e38 100644
--- a/src/df_engine_extensions/src/dist_sql_query/test_util.rs
+++ b/src/df_engine_extensions/src/dist_sql_query/test_util.rs
@@ -490,8 +490,10 @@ impl ExecutionPlan for MockScan {
unimplemented!()
}
- fn statistics(&self) -> datafusion::physical_plan::Statistics {
- unimplemented!()
+ fn statistics(&self) -> DfResult<datafusion::physical_plan::Statistics> {
+ Ok(datafusion::physical_plan::Statistics::new_unknown(
+ &self.schema(),
+ ))
}
}
diff --git a/src/df_operator/src/scalar.rs b/src/df_operator/src/scalar.rs
index 1535ebdb..58e8214c 100644
--- a/src/df_operator/src/scalar.rs
+++ b/src/df_operator/src/scalar.rs
@@ -31,6 +31,7 @@ pub struct ScalarUdf {
}
impl ScalarUdf {
+ #[allow(deprecated)]
pub fn create(name: &str, func: ScalarFunction) -> Self {
let signature = func.signature().to_datafusion_signature();
let return_type = func.return_type().to_datafusion_return_type();
@@ -43,7 +44,7 @@ impl ScalarUdf {
#[inline]
pub fn name(&self) -> &str {
- &self.df_udf.name
+ self.df_udf.name()
}
/// Convert into datafusion's udf
diff --git a/src/df_operator/src/udaf.rs b/src/df_operator/src/udaf.rs
index 448a26c6..44f39136 100644
--- a/src/df_operator/src/udaf.rs
+++ b/src/df_operator/src/udaf.rs
@@ -31,6 +31,7 @@ pub struct AggregateUdf {
}
impl AggregateUdf {
+ #[allow(deprecated)]
pub fn create(name: &str, func: AggregateFunction) -> Self {
let signature = func.signature().to_datafusion_signature();
let return_type = func.return_type().to_datafusion_return_type();
@@ -50,7 +51,7 @@ impl AggregateUdf {
#[inline]
pub fn name(&self) -> &str {
- &self.df_udaf.name
+ self.df_udaf.name()
}
#[inline]
diff --git a/src/interpreters/src/insert.rs b/src/interpreters/src/insert.rs
index cac5af0c..cc455b3f 100644
--- a/src/interpreters/src/insert.rs
+++ b/src/interpreters/src/insert.rs
@@ -374,5 +374,5 @@ fn get_or_extract_column_from_row_groups(
Ok(columnar_value)
})?;
- Ok(column.into_array(num_rows))
+ column.into_array(num_rows).context(DatafusionExecutor)
}
diff --git a/src/interpreters/src/tests.rs b/src/interpreters/src/tests.rs
index 6d521738..f9c8c75b 100644
--- a/src/interpreters/src/tests.rs
+++ b/src/interpreters/src/tests.rs
@@ -117,7 +117,7 @@ where
.enable_partition_table_access(enable_partition_table_access)
.build();
let sql= format!("CREATE TABLE IF NOT EXISTS {table_name}(c1 string
tag not null,ts timestamp not null, c3 string, timestamp key(ts),primary
key(c1, ts)) \
- ENGINE=Analytic WITH
(ttl='70d',update_mode='overwrite',arena_block_size='1KB')");
+ ENGINE=Analytic WITH
(enable_ttl='false',update_mode='overwrite',arena_block_size='1KB')");
let output = self.sql_to_output_with_context(&sql, ctx).await?;
assert!(
@@ -157,7 +157,7 @@ where
.enable_partition_table_access(enable_partition_table_access)
.build();
let sql = format!("select * from {table_name}");
- let output = self.sql_to_output_with_context(&sql, ctx).await?;
+ let output = self.sql_to_output_with_context(&sql, ctx.clone()).await?;
let records = output.try_into().unwrap();
let expected = vec![
"+------------+---------------------+--------+--------+------------+--------------+",
@@ -169,15 +169,15 @@ where
];
test_util::assert_record_batches_eq(&expected, records);
- let sql = "select count(*) from test_table";
- let output = self.sql_to_output(sql).await?;
+ let sql = format!("select count(*) from {table_name}");
+ let output = self.sql_to_output_with_context(&sql, ctx).await?;
let records = output.try_into().unwrap();
let expected = vec![
- "+-----------------+",
- "| COUNT(UInt8(1)) |",
- "+-----------------+",
- "| 2 |",
- "+-----------------+",
+ "+----------+",
+ "| COUNT(*) |",
+ "+----------+",
+ "| 2 |",
+ "+----------+",
];
test_util::assert_record_batches_eq(&expected, records);
diff --git a/src/proxy/src/grpc/prom_query.rs b/src/proxy/src/grpc/prom_query.rs
index 1c999ad0..673b6131 100644
--- a/src/proxy/src/grpc/prom_query.rs
+++ b/src/proxy/src/grpc/prom_query.rs
@@ -471,7 +471,7 @@ mod tests {
let schema = build_schema();
let record_schema = schema.to_record_schema();
let column_blocks = build_column_block();
- let record_batch = RecordBatch::new(record_schema,
column_blocks).unwrap();
+ let record_batch = RecordBatch::new(record_schema, column_blocks,
4).unwrap();
let column_name = ColumnNames {
timestamp: "timestamp".to_string(),
diff --git a/src/proxy/src/influxdb/types.rs b/src/proxy/src/influxdb/types.rs
index 117b5cf3..488f5ded 100644
--- a/src/proxy/src/influxdb/types.rs
+++ b/src/proxy/src/influxdb/types.rs
@@ -744,7 +744,7 @@ mod tests {
fn test_influxql_result() {
let record_schema = build_test_record_schema();
let column_blocks = build_test_column_blocks();
- let record_batch = RecordBatch::new(record_schema,
column_blocks).unwrap();
+ let record_batch = RecordBatch::new(record_schema, column_blocks,
7).unwrap();
let mut builder = InfluxqlResultBuilder::new(record_batch.schema(),
0).unwrap();
builder.add_record_batch(record_batch).unwrap();
diff --git a/src/query_engine/src/datafusion_impl/mod.rs
b/src/query_engine/src/datafusion_impl/mod.rs
index 48e42c21..482628f8 100644
--- a/src/query_engine/src/datafusion_impl/mod.rs
+++ b/src/query_engine/src/datafusion_impl/mod.rs
@@ -137,7 +137,7 @@ impl DfContextBuilder {
// Using default logcial optimizer, if want to add more custom rule,
using
// `add_optimizer_rule` to add.
- let state = SessionState::with_config_rt(df_session_config,
self.runtime_env.clone());
- SessionContext::with_state(state)
+ let state = SessionState::new_with_config_rt(df_session_config,
self.runtime_env.clone());
+ SessionContext::new_with_state(state)
}
}
diff --git
a/src/query_engine/src/datafusion_impl/physical_optimizer/repartition.rs
b/src/query_engine/src/datafusion_impl/physical_optimizer/repartition.rs
index c963c75f..d1406a75 100644
--- a/src/query_engine/src/datafusion_impl/physical_optimizer/repartition.rs
+++ b/src/query_engine/src/datafusion_impl/physical_optimizer/repartition.rs
@@ -21,7 +21,9 @@ use std::sync::Arc;
use datafusion::{
config::ConfigOptions,
- physical_optimizer::{optimizer::PhysicalOptimizerRule,
repartition::Repartition},
+ physical_optimizer::{
+ enforce_distribution::EnforceDistribution,
optimizer::PhysicalOptimizerRule,
+ },
physical_plan::ExecutionPlan,
};
use logger::debug;
@@ -34,7 +36,7 @@ pub struct RepartitionAdapter {
impl Adapter for RepartitionAdapter {
fn may_adapt(original_rule: OptimizeRuleRef) -> OptimizeRuleRef {
- if original_rule.name() == Repartition::new().name() {
+ if original_rule.name() == EnforceDistribution::new().name() {
Arc::new(Self { original_rule })
} else {
original_rule
diff --git
a/src/query_engine/src/datafusion_impl/physical_plan_extension/prom_align.rs
b/src/query_engine/src/datafusion_impl/physical_plan_extension/prom_align.rs
index a5a6161c..3b1a0cd9 100644
--- a/src/query_engine/src/datafusion_impl/physical_plan_extension/prom_align.rs
+++ b/src/query_engine/src/datafusion_impl/physical_plan_extension/prom_align.rs
@@ -37,7 +37,7 @@ use common_types::{
time::{TimeRange, Timestamp},
};
use datafusion::{
- error::{DataFusionError, Result as ArrowResult},
+ error::{DataFusionError, Result as DataFusionResult},
execution::context::TaskContext,
physical_expr::PhysicalSortExpr,
physical_plan::{
@@ -93,15 +93,15 @@ impl PhysicalExpr for ExtractTsidExpr {
self
}
- fn data_type(&self, _input_schema: &ArrowSchema) -> ArrowResult<DataType> {
+ fn data_type(&self, _input_schema: &ArrowSchema) ->
DataFusionResult<DataType> {
Ok(DataType::UInt64)
}
- fn nullable(&self, _input_schema: &ArrowSchema) -> ArrowResult<bool> {
+ fn nullable(&self, _input_schema: &ArrowSchema) -> DataFusionResult<bool> {
Ok(false)
}
- fn evaluate(&self, batch: &RecordBatch) -> ArrowResult<ColumnarValue> {
+ fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue>
{
let tsid_idx = batch
.schema()
.index_of(TSID_COLUMN)
@@ -116,7 +116,7 @@ impl PhysicalExpr for ExtractTsidExpr {
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn PhysicalExpr>>,
- ) -> ArrowResult<Arc<dyn PhysicalExpr>> {
+ ) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
Ok(self)
}
@@ -204,7 +204,7 @@ impl ExecutionPlan for PromAlignExec {
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
- ) -> ArrowResult<Arc<dyn ExecutionPlan>> {
+ ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
match children.len() {
1 => Ok(Arc::new(PromAlignExec {
input: children[0].clone(),
@@ -222,7 +222,7 @@ impl ExecutionPlan for PromAlignExec {
&self,
partition: usize,
context: Arc<TaskContext>,
- ) -> ArrowResult<DfSendableRecordBatchStream> {
+ ) -> DataFusionResult<DfSendableRecordBatchStream> {
debug!("PromAlignExec: partition:{}", partition);
Ok(Box::pin(PromAlignReader {
input: self.input.execute(partition, context)?,
@@ -236,9 +236,9 @@ impl ExecutionPlan for PromAlignExec {
}))
}
- fn statistics(&self) -> Statistics {
+ fn statistics(&self) -> DataFusionResult<Statistics> {
// TODO(chenxiang)
- Statistics::default()
+ Ok(Statistics::new_unknown(&self.schema()))
}
}
diff --git a/src/query_engine/src/datafusion_impl/task_context.rs
b/src/query_engine/src/datafusion_impl/task_context.rs
index aee98128..d1ea667d 100644
--- a/src/query_engine/src/datafusion_impl/task_context.rs
+++ b/src/query_engine/src/datafusion_impl/task_context.rs
@@ -40,7 +40,6 @@ use df_engine_extensions::dist_sql_query::{
};
use futures::future::BoxFuture;
use generic_error::BoxError;
-use prost::Message;
use runtime::Priority;
use snafu::ResultExt;
use table_engine::{
@@ -116,7 +115,7 @@ impl Preprocessor {
ctx: &Context,
) -> Result<Arc<dyn ExecutionPlan>> {
// Decode to datafusion physical plan.
- let protobuf = protobuf::PhysicalPlanNode::decode(encoded_plan)
+ let protobuf = protobuf::PhysicalPlanNode::try_decode(encoded_plan)
.box_err()
.with_context(|| ExecutorWithCause {
msg: Some("failed to decode plan".to_string()),
diff --git a/src/query_frontend/src/influxql/planner.rs
b/src/query_frontend/src/influxql/planner.rs
index 3b21228a..ed8d9c14 100644
--- a/src/query_frontend/src/influxql/planner.rs
+++ b/src/query_frontend/src/influxql/planner.rs
@@ -57,7 +57,7 @@ struct InfluxQLSchemaProvider<'a, P: MetaProvider> {
impl<'a, P: MetaProvider> SchemaProvider for InfluxQLSchemaProvider<'a, P> {
fn get_table_provider(&self, name: &str) ->
datafusion::error::Result<Arc<dyn TableSource>> {
self.context_provider
- .get_table_provider(name.into())
+ .get_table_source(name.into())
.map_err(|e| {
DataFusionError::Plan(format!(
"measurement does not exist, measurement:{name},
source:{e}"
diff --git a/src/query_frontend/src/logical_optimizer/mod.rs
b/src/query_frontend/src/logical_optimizer/mod.rs
index 4d62e877..8f2bf42a 100644
--- a/src/query_frontend/src/logical_optimizer/mod.rs
+++ b/src/query_frontend/src/logical_optimizer/mod.rs
@@ -30,7 +30,8 @@ use datafusion::{
use type_conversion::TypeConversion;
pub fn optimize_plan(plan: &LogicalPlan) -> Result<LogicalPlan> {
- let state = SessionState::with_config_rt(SessionConfig::new(),
Arc::new(RuntimeEnv::default()));
+ let state =
+ SessionState::new_with_config_rt(SessionConfig::new(),
Arc::new(RuntimeEnv::default()));
let state = register_analyzer_rules(state);
// Register iox optimizers, used by influxql.
let state =
influxql_query::logical_optimizer::register_iox_logical_optimizers(state);
diff --git a/src/query_frontend/src/logical_optimizer/type_conversion.rs
b/src/query_frontend/src/logical_optimizer/type_conversion.rs
index 89f0a14e..0aeaaba2 100644
--- a/src/query_frontend/src/logical_optimizer/type_conversion.rs
+++ b/src/query_frontend/src/logical_optimizer/type_conversion.rs
@@ -30,7 +30,7 @@ use datafusion::{
logical_expr::{
expr::{Expr, InList},
logical_plan::{Filter, LogicalPlan, TableScan},
- utils, Between, BinaryExpr, ExprSchemable, Operator,
+ Between, BinaryExpr, ExprSchemable, Operator,
},
optimizer::analyzer::AnalyzerRule,
scalar::ScalarValue,
@@ -113,17 +113,18 @@ impl AnalyzerRule for TypeConversion {
.map(|plan| self.analyze(plan.clone(), config))
.collect::<Result<Vec<_>>>()?;
- let expr = plan
+ let exprs = plan
.expressions()
.into_iter()
.map(|e| e.rewrite(&mut rewriter))
.collect::<Result<Vec<_>>>()?;
- Ok(utils::from_plan(&plan, &expr, &new_inputs)?)
+ Ok(LogicalPlan::with_new_exprs(&plan, exprs, &new_inputs)?)
}
LogicalPlan::Subquery(_)
| LogicalPlan::Statement { .. }
| LogicalPlan::SubqueryAlias(_)
+ | LogicalPlan::Copy(_)
| LogicalPlan::Unnest(_)
| LogicalPlan::EmptyRelation { .. } => Ok(plan.clone()),
}
@@ -209,7 +210,7 @@ impl<'a> TypeRewriter<'a> {
}
}
- let array = value.to_array();
+ let array = value.to_array()?;
ScalarValue::try_from_array(
&compute::cast(&array,
data_type).map_err(DataFusionError::ArrowError)?,
// index: Converts a value in `array` at `index` into a ScalarValue
diff --git a/src/query_frontend/src/parser.rs b/src/query_frontend/src/parser.rs
index e01c4d03..23efa0ad 100644
--- a/src/query_frontend/src/parser.rs
+++ b/src/query_frontend/src/parser.rs
@@ -352,7 +352,7 @@ impl<'a> Parser<'a> {
is_dictionary = true;
}
}
- if c.data_type != DataType::String && is_dictionary {
+ if !matches!(c.data_type, DataType::String(_)) && is_dictionary {
return parser_err!(format!(
"Only string column can be dictionary encoded: {:?}",
c.to_string()
@@ -1001,7 +1001,7 @@ mod tests {
let columns = vec![
make_column_def("c1", DataType::Timestamp(None,
TimezoneInfo::None)),
make_column_def("c2", DataType::Double),
- make_column_def("c3", DataType::String),
+ make_column_def("c3", DataType::String(None)),
];
let sql = "CREATE TABLE mytbl(c1 timestamp, c2 double, c3 string,)
ENGINE = XX";
@@ -1027,7 +1027,7 @@ mod tests {
let columns = vec![
make_column_def("c1", DataType::Timestamp(None,
TimezoneInfo::None)),
make_comment_column_def("c2", DataType::Double, "id".to_string()),
- make_comment_column_def("c3", DataType::String,
"name".to_string()),
+ make_comment_column_def("c3", DataType::String(None),
"name".to_string()),
];
let sql = "CREATE TABLE mytbl(c1 timestamp, c2 double comment 'id', c3
string comment 'name',) ENGINE = XX";
@@ -1053,7 +1053,7 @@ mod tests {
let columns = vec![
make_column_def("c1", DataType::Timestamp(None,
TimezoneInfo::None)),
make_column_def("c2", DataType::Timestamp(None,
TimezoneInfo::None)),
- make_column_def("c3", DataType::String),
+ make_column_def("c3", DataType::String(None)),
make_column_def("c4", DataType::Double),
];
@@ -1253,7 +1253,7 @@ mod tests {
table_name: make_table_name("t"),
columns: vec![
make_column_def("c1", DataType::Double),
- make_column_def("c2", DataType::String),
+ make_column_def("c2", DataType::String(None)),
],
});
expect_parse_ok(sql, expected).unwrap();
@@ -1277,7 +1277,7 @@ mod tests {
table_name: make_table_name("t"),
columns: vec![
make_column_def("c1", DataType::Double),
- make_tag_column_def("c2", DataType::String),
+ make_tag_column_def("c2", DataType::String(None)),
],
});
expect_parse_ok(sql, expected).unwrap();
@@ -1287,7 +1287,7 @@ mod tests {
let sql = "ALTER TABLE t ADD COLUMN c1 string tag";
let expected = Statement::AlterAddColumn(AlterAddColumn {
table_name: make_table_name("t"),
- columns: vec![make_tag_column_def("c1", DataType::String)],
+ columns: vec![make_tag_column_def("c1",
DataType::String(None))],
});
expect_parse_ok(sql, expected).unwrap();
}
diff --git a/src/query_frontend/src/promql/convert.rs
b/src/query_frontend/src/promql/convert.rs
index 297e7161..f364a0b1 100644
--- a/src/query_frontend/src/promql/convert.rs
+++ b/src/query_frontend/src/promql/convert.rs
@@ -24,7 +24,7 @@ use common_types::{
use datafusion::{
logical_expr::{
avg, count,
- expr::{Alias, ScalarUDF},
+ expr::{Alias, ScalarFunction},
lit,
logical_plan::{Extension, LogicalPlan, LogicalPlanBuilder},
max, min, sum, Expr as DataFusionExpr,
@@ -316,11 +316,10 @@ impl Expr {
// TSID is lost after aggregate, but PromAlignNode
need a unique id, so
// mock UUID as tsid based on groupby keys
DataFusionExpr::Alias(Alias {
- expr: Box::new(DataFusionExpr::ScalarUDF(ScalarUDF
{
- fun:
Arc::new(create_unique_id(tag_exprs.len())),
- args: tag_exprs.clone(),
- })),
+ expr: Box::new(DataFusionExpr::ScalarFunction(
+
ScalarFunction::new_udf(Arc::new(create_unique_id(tag_exprs.len())),
tag_exprs.clone()))),
name: TSID_COLUMN.to_string(),
+ relation: None,
});
let mut projection = tag_exprs.clone();
projection.extend(vec![
@@ -371,6 +370,7 @@ impl Expr {
Ok(DataFusionExpr::Alias(Alias {
expr: Box::new(expr),
name: alias,
+ relation: None,
}))
}
}
@@ -578,7 +578,7 @@ impl Selector {
.context(TableNotFound { name: &table })?;
let table_provider = meta_provider
- .get_table_provider(table_ref.table.name().into())
+ .get_table_source(table_ref.table.name().into())
.context(TableProviderNotFound { name: &table })?;
let schema =
Schema::try_from(table_provider.schema()).context(BuildTableSchema)?;
let timestamp_column_name = schema.timestamp_name().to_string();
diff --git a/src/query_frontend/src/promql/remote.rs
b/src/query_frontend/src/promql/remote.rs
index c687b51d..c3c1439e 100644
--- a/src/query_frontend/src/promql/remote.rs
+++ b/src/query_frontend/src/promql/remote.rs
@@ -64,7 +64,7 @@ pub fn remote_query_to_plan<P: MetaProvider>(
let (metric, field, mut filters) = normalize_matchers(query.matchers)?;
let table_provider = meta_provider
- .get_table_provider(TableReference::bare(&metric))
+ .get_table_source(TableReference::bare(&metric))
.context(TableProviderNotFound { name: &metric })?;
let schema =
Schema::try_from(table_provider.schema()).context(BuildTableSchema)?;
let timestamp_col_name = schema.timestamp_name();
diff --git a/src/query_frontend/src/provider.rs
b/src/query_frontend/src/provider.rs
index 4380829f..64647254 100644
--- a/src/query_frontend/src/provider.rs
+++ b/src/query_frontend/src/provider.rs
@@ -320,7 +320,7 @@ impl<'a, P: MetaProvider> MetaProvider for
ContextProviderAdapter<'a, P> {
}
impl<'a, P: MetaProvider> ContextProvider for ContextProviderAdapter<'a, P> {
- fn get_table_provider(
+ fn get_table_source(
&self,
name: TableReference,
) -> std::result::Result<Arc<(dyn TableSource + 'static)>,
DataFusionError> {
diff --git a/src/table_engine/src/memory.rs b/src/table_engine/src/memory.rs
index 68967705..20cfe583 100644
--- a/src/table_engine/src/memory.rs
+++ b/src/table_engine/src/memory.rs
@@ -260,7 +260,7 @@ fn row_group_to_record_batch(
column_blocks.push(column_block);
}
- RecordBatch::new(record_schema.clone(), column_blocks)
+ RecordBatch::new(record_schema.clone(), column_blocks, rows.num_rows())
.box_err()
.context(ErrWithSource {
msg: "failed to create RecordBatch",
diff --git a/src/table_engine/src/predicate.rs
b/src/table_engine/src/predicate.rs
index 723724f3..b316b99e 100644
--- a/src/table_engine/src/predicate.rs
+++ b/src/table_engine/src/predicate.rs
@@ -329,6 +329,8 @@ impl<'a> TimeRangeExtractor<'a> {
| Operator::BitwiseAnd
| Operator::BitwiseOr
| Operator::BitwiseXor
+ | Operator::AtArrow
+ | Operator::ArrowAt
| Operator::BitwiseShiftRight
| Operator::BitwiseShiftLeft
| Operator::StringConcat => TimeRange::min_to_max(),
@@ -427,20 +429,18 @@ impl<'a> TimeRangeExtractor<'a> {
| Expr::IsUnknown(_)
| Expr::IsNotUnknown(_)
| Expr::Negative(_)
+ | Expr::AggregateUDF(_)
| Expr::Case { .. }
| Expr::Cast { .. }
| Expr::TryCast { .. }
| Expr::Sort { .. }
| Expr::ScalarFunction { .. }
- | Expr::ScalarUDF { .. }
| Expr::AggregateFunction { .. }
| Expr::WindowFunction { .. }
- | Expr::AggregateUDF { .. }
| Expr::Wildcard { .. }
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::ScalarSubquery(_)
- | Expr::QualifiedWildcard { .. }
| Expr::GroupingSet(_)
| Expr::GetIndexedField { .. }
| Expr::OuterReferenceColumn { .. }
diff --git a/src/table_engine/src/provider.rs b/src/table_engine/src/provider.rs
index d5e4c69f..bcca5ba8 100644
--- a/src/table_engine/src/provider.rs
+++ b/src/table_engine/src/provider.rs
@@ -19,6 +19,7 @@
use std::{
any::Any,
+ collections::HashSet,
fmt,
sync::{Arc, Mutex},
time::{Duration, Instant},
@@ -35,8 +36,10 @@ use datafusion::{
logical_expr::{Expr, TableProviderFilterPushDown, TableSource, TableType},
physical_expr::PhysicalSortExpr,
physical_plan::{
+ expressions,
metrics::{Count, MetricValue, MetricsSet},
- DisplayAs, DisplayFormatType, ExecutionPlan, Metric, Partitioning,
+ projection::ProjectionExec,
+ DisplayAs, DisplayFormatType, ExecutionPlan, Metric, Partitioning,
PhysicalExpr,
SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics,
},
};
@@ -230,9 +233,34 @@ impl<B: TableScanBuilder> TableProviderAdapter<B> {
priority,
);
+ let mut need_reprojection = false;
+ let all_projections = if let Some(proj) = projection {
+ let mut original_projections = proj.clone();
+ let projections_from_filter =
+ collect_projection_from_expr(filters,
&self.current_table_schema);
+ for proj in projections_from_filter {
+ if !original_projections.contains(&proj) {
+ original_projections.push(proj);
+ // If the projection from filters have columns not in the
original projection,
+ // we need to add it to projection, and add a
ProjectionExec plan to project the
+ // orignal columns. Eg:
+ // ```text
+ // select a from table where b > 1
+ // ```
+ // The original projection only contains a, but the filter
has column b, so we
+ // need to query both a and b column from table but only
+ // output a column. More details can be found in:
+ //
https://github.com/apache/arrow-datafusion/pull/9131#pullrequestreview-1865020767
+ need_reprojection = true;
+ }
+ }
+ Some(original_projections)
+ } else {
+ None
+ };
let predicate = self.check_and_build_predicate_from_filters(filters);
let projected_schema =
- ProjectedSchema::new(self.current_table_schema.clone(),
projection.cloned()).map_err(
+ ProjectedSchema::new(self.current_table_schema.clone(),
all_projections).map_err(
|e| {
DataFusionError::Internal(format!(
"Invalid projection, plan:{self:?},
projection:{projection:?}, err:{e:?}"
@@ -240,6 +268,22 @@ impl<B: TableScanBuilder> TableProviderAdapter<B> {
},
)?;
+ let projection_exprs = if need_reprojection {
+ let original_projection = projection.unwrap();
+ let exprs = (0..original_projection.len())
+ .map(|i| {
+ let column = projected_schema.target_column_schema(i);
+ (
+ Arc::new(expressions::Column::new(&column.name, i))
+ as Arc<dyn PhysicalExpr>,
+ column.name.clone(),
+ )
+ })
+ .collect::<Vec<_>>();
+ Some(exprs)
+ } else {
+ None
+ };
let opts = ReadOptions {
deadline,
read_parallelism,
@@ -256,7 +300,13 @@ impl<B: TableScanBuilder> TableProviderAdapter<B> {
priority,
};
- self.builder.build(request).await
+ let scan = self.builder.build(request).await?;
+ if let Some(expr) = projection_exprs {
+ let plan = ProjectionExec::try_new(expr, scan)?;
+ Ok(Arc::new(plan))
+ } else {
+ Ok(scan)
+ }
}
fn check_and_build_predicate_from_filters(&self, filters: &[Expr]) ->
PredicateRef {
@@ -410,7 +460,7 @@ impl ExecutionPlan for ScanTable {
// However, we have no inputs here, so `UnknownPartitioning` is
suitable.
// In datafusion, always set it to `UnknownPartitioning` in the scan
plan, for
// example:
https://github.com/apache/arrow-datafusion/blob/cf152af6515f0808d840e1fe9c63b02802595826/datafusion/core/src/datasource/physical_plan/csv.rs#L175
- Partitioning::UnknownPartitioning(self.parallelism)
+ Partitioning::UnknownPartitioning(self.parallelism.max(1))
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
@@ -467,9 +517,12 @@ impl ExecutionPlan for ScanTable {
Some(metric_set)
}
- fn statistics(&self) -> Statistics {
+ fn statistics(
+ &self,
+ ) -> std::result::Result<datafusion::common::Statistics,
datafusion::error::DataFusionError>
+ {
// TODO(yingwen): Implement this
- Statistics::default()
+ Ok(Statistics::new_unknown(&self.schema()))
}
}
@@ -477,10 +530,11 @@ impl DisplayAs for ScanTable {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) ->
fmt::Result {
write!(
f,
- "ScanTable: table={}, parallelism={}, priority={:?}",
+ "ScanTable: table={}, parallelism={}, priority={:?},
partition_count={:?}",
self.table.name(),
self.request.opts.read_parallelism,
- self.request.priority
+ self.request.priority,
+ self.output_partitioning()
)
}
}
@@ -495,3 +549,16 @@ impl fmt::Debug for ScanTable {
.finish()
}
}
+
+fn collect_projection_from_expr(exprs: &[Expr], schema: &Schema) ->
HashSet<usize> {
+ let mut projections = HashSet::new();
+ exprs.iter().for_each(|expr| {
+ for col_name in visitor::find_columns_by_expr(expr) {
+ if let Some(idx) = schema.index_of(&col_name) {
+ projections.insert(idx);
+ }
+ }
+ });
+
+ projections
+}
diff --git a/src/table_engine/src/table.rs b/src/table_engine/src/table.rs
index 7365ca66..3c611b43 100644
--- a/src/table_engine/src/table.rs
+++ b/src/table_engine/src/table.rs
@@ -421,6 +421,7 @@ impl fmt::Debug for ReadRequest {
.field("projected", &projected)
.field("predicate", &predicate)
.field("priority", &self.priority)
+ .field("projected_schema", &self.projected_schema)
.finish()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]