This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new f1bb4088d [KYUUBI #6504] Improve docs for Solution for Large Query
Results
f1bb4088d is described below
commit f1bb4088d3211ac4d2cd8f116f6ec37a19de0001
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Jun 26 21:45:34 2024 +0800
[KYUUBI #6504] Improve docs for Solution for Large Query Results
# :mag: Description
Update the outdated docs to mention new features:
1. Arrow serialization
2. Sava query results into files
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
Docs changes, review.
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6504 from pan3793/docs-large-result.
Closes #6504
77043bd4d [Cheng Pan] fix
e8ba1bac1 [Cheng Pan] grammar
198d00e33 [Cheng Pan] grammar
a37f631eb [Cheng Pan] Improve docs for Solution for Large Query Results
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/deployment/spark/incremental_collection.md | 120 -------------
docs/deployment/spark/index.rst | 2 +-
docs/deployment/spark/large_query_results.md | 219 ++++++++++++++++++++++++
3 files changed, 220 insertions(+), 121 deletions(-)
diff --git a/docs/deployment/spark/incremental_collection.md
b/docs/deployment/spark/incremental_collection.md
deleted file mode 100644
index 1210e331a..000000000
--- a/docs/deployment/spark/incremental_collection.md
+++ /dev/null
@@ -1,120 +0,0 @@
-<!--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--->
-
-# Solution for Big Result Sets
-
-Typically, when a user submits a SELECT query to Spark SQL engine, the Driver
calls `collect` to trigger calculation and
-collect the entire data set of all tasks(a.k.a. partitions of an RDD), after
all partitions data arrived, then the
-client pulls the result set from the Driver through the Kyuubi Server in small
batch.
-
-Therefore, the bottleneck is the Spark Driver for a query with a big result
set. To avoid OOM, Spark has a configuration
-`spark.driver.maxResultSize` which default is `1g`, you should enlarge it as
well as `spark.driver.memory` if your
-query has result set in several GB. But what if the result set size is dozens
GB or event hundreds GB? It would be best
-if you have incremental collection mode.
-
-## Incremental collection
-
-Since v1.4.0-incubating, Kyuubi supports incremental collection mode, it is a
solution for big result sets. This feature
-is disabled in default, you can turn on it by setting the configuration
`kyuubi.operation.incremental.collect` to `true`.
-
-The incremental collection changes the gather method from `collect` to
`toLocalIterator`. `toLocalIterator` is a Spark
-action that sequentially submits Jobs to retrieve partitions. As each
partition is retrieved, the client through pulls
-the result set from the Driver through the Kyuubi Server streamingly. It
reduces the Driver memory significantly from
-the size of the complete result set to the maximum partition.
-
-The incremental collection is not the silver bullet, you should turn it on
carefully, because it can significantly hurt
-performance. And even in incremental collection mode, when multiple queries
execute concurrently, each query still requires
-one partition of data in Driver memory. Therefore, it is still important to
control the number of concurrent queries to
-avoid OOM.
-
-## Use in single connections
-
-As above explains, the incremental collection mode is not suitable for common
query sense, you can enable incremental
-collection mode for specific queries by using
-
-```
-kyuubi-beeline -u
'jdbc:kyuubi://kyuubi:10009/?spark.driver.maxResultSize=8g;spark.driver.memory=12g#kyuubi.engine.share.level=CONNECTION;kyuubi.operation.incremental.collect=true'
\
- --incremental=true \
- -f big_result_query.sql
-```
-
-`--incremental=true` is required for kyuubi-beeline client, otherwise, the
entire result sets is fetched and buffered before
-being displayed, which may cause client side OOM.
-
-## Change incremental collection mode in session
-
-The configuration `kyuubi.operation.incremental.collect` can also be changed
using `SET` in session.
-
-```
-~ kyuubi-beeline -u 'jdbc:kyuubi://localhost:10009'
-Connected to: Apache Kyuubi (version 1.9.0)
-
-0: jdbc:kyuubi://localhost:10009/> set
kyuubi.operation.incremental.collect=true;
-+---------------------------------------+--------+
-| key | value |
-+---------------------------------------+--------+
-| kyuubi.operation.incremental.collect | true |
-+---------------------------------------+--------+
-1 row selected (0.039 seconds)
-
-0: jdbc:kyuubi://localhost:10009/> select /*+ REPARTITION(5) */ * from
range(1, 10);
-+-----+
-| id |
-+-----+
-| 2 |
-| 6 |
-| 7 |
-| 0 |
-| 5 |
-| 3 |
-| 4 |
-| 1 |
-| 8 |
-| 9 |
-+-----+
-10 rows selected (1.929 seconds)
-
-0: jdbc:kyuubi://localhost:10009/> set
kyuubi.operation.incremental.collect=false;
-+---------------------------------------+--------+
-| key | value |
-+---------------------------------------+--------+
-| kyuubi.operation.incremental.collect | false |
-+---------------------------------------+--------+
-1 row selected (0.027 seconds)
-
-0: jdbc:kyuubi://localhost:10009/> select /*+ REPARTITION(5) */ * from
range(1, 10);
-+-----+
-| id |
-+-----+
-| 2 |
-| 6 |
-| 7 |
-| 0 |
-| 5 |
-| 3 |
-| 4 |
-| 1 |
-| 8 |
-| 9 |
-+-----+
-10 rows selected (0.128 seconds)
-```
-
-From the Spark UI, we can see that in incremental collection mode, the query
produces 5 jobs (in red square), and in
-normal mode, only produces 1 job (in blue square).
-
-
diff --git a/docs/deployment/spark/index.rst b/docs/deployment/spark/index.rst
index acaaa6ec5..237bdd7aa 100644
--- a/docs/deployment/spark/index.rst
+++ b/docs/deployment/spark/index.rst
@@ -29,5 +29,5 @@ Even if you don't use Kyuubi, as a simple Spark user, I'm
sure you'll find the n
dynamic_allocation
aqe
- incremental_collection
+ large_query_results
gluten
diff --git a/docs/deployment/spark/large_query_results.md
b/docs/deployment/spark/large_query_results.md
new file mode 100644
index 000000000..a2b07a557
--- /dev/null
+++ b/docs/deployment/spark/large_query_results.md
@@ -0,0 +1,219 @@
+<!--
+- Licensed to the Apache Software Foundation (ASF) under one or more
+- contributor license agreements. See the NOTICE file distributed with
+- this work for additional information regarding copyright ownership.
+- The ASF licenses this file to You under the Apache License, Version 2.0
+- (the "License"); you may not use this file except in compliance with
+- the License. You may obtain a copy of the License at
+-
+- http://www.apache.org/licenses/LICENSE-2.0
+-
+- Unless required by applicable law or agreed to in writing, software
+- distributed under the License is distributed on an "AS IS" BASIS,
+- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- See the License for the specific language governing permissions and
+- limitations under the License.
+-->
+
+# Solution for Large Query Results
+
+By default, when you submit a query to Spark SQL engine, the Spark driver
triggers the calculation of the
+result RDD and then collect the entire query results from all partitions into
memory, the query is marked as
+completed after all partitions data arrived, then the client pulls the result
set from the Spark driver through
+the Kyuubi Server in small batches.
+
+Therefore, for queries with a large result set, the major bottleneck here is
the Spark driver. To avoid OOM, Spark
+has a configuration `spark.driver.maxResultSize` which default value is `1g`,
you should enlarge it as well as
+`spark.driver.memory` if you are going to run a query that has result set in
several GiB.
+
+You may notice that even a simple query `SELECT * FROM table_1g` requires a
larger value than `1g` for
+`spark.driver.maxResultSize`, this is because the data is serialized in
different formats between the HDFS/S3 and
+the Spark memory, generally the persisted data using the columnar formats like
Parquet/ORC is much smaller than
+it occupied in memory.
+
+So what if the result set size is dozens GiB or even hundreds GiB? The
following sections may help you.
+
+## Incremental collect
+
+Since v1.4.0-incubating, Kyuubi supports incremental collect mode. This
feature is disabled in default, you can turn on
+it by setting `kyuubi.operation.incremental.collect` to `true`.
+
+You may know that RDD (Resilient Distributed Dataset) is a core concept of
Spark, which is a fault-tolerant collection
+of elements that can be operated on in parallel. The key idea here is to
serialize the calculation of the result RDD.
+
+The incremental collect changes the gather method from `collect` to
`toLocalIterator`. Unlike `collect` to trigger a
+single job to calculate the whole result RDD in parallel and collect the
entire result set into memory, `toLocalIterator`
+sequentially submits jobs to calculate and retrieve partitions of RDD. The
query is marked as completed once the first
+partition of the result RDD is retrieved, then client pulls the result set
from the Spark driver through the Kyuubi Server
+in small batches. After the partition is consumed out, the Spark driver
release the memory, then start to calculate
+the next partition, and so on. It significantly reduces the consumption of the
Spark driver memory from the whole
+result RDD to the maximum partition.
+
+The incremental collect is not a silver bullet, you should turn it on
carefully, obviously, serialized execution
+significantly hurt the performance. And even in incremental collect mode, when
multiple queries execute concurrently,
+each query still requires one partition of data in Spark driver memory.
Therefore, it is still important to control
+the number of concurrent queries to avoid OOM.
+
+The incremental collect requires no changes on the client side, all existing
clients could benefit from this feature.
+
+### Retrieve large query results in isolated engine
+
+As above explains, the incremental collect mode is not suitable for common
query sense, you can enable incremental
+collection mode for specific queries by using
+
+```
+kyuubi-beeline -u 'jdbc:kyuubi://kyuubi:10009/' \
+ --hiveconf spark.driver.maxResultSize=8g \
+ --hiveconf spark.driver.memory=12g \
+ --hiveconf kyuubi.engine.share.level=CONNECTION \
+ --hiveconf kyuubi.operation.incremental.collect=true \
+ --incremental=true \
+ -f large_result_query.sql
+```
+
+For Hive BeeLine prior to 2.3.0, `--incremental=true` is required, otherwise,
the entire result sets is fetched and
+buffered in memory before being displayed, which may cause client process OOM.
See more details in HIVE-7224.
+
+### Change incremental collect mode in session
+
+The configuration `kyuubi.operation.incremental.collect` can also be changed
using `SET` in session.
+
+```
+~ kyuubi-beeline -u 'jdbc:kyuubi://localhost:10009'
+Connected to: Apache Kyuubi (version 1.9.0)
+
+0: jdbc:kyuubi://localhost:10009/> set
kyuubi.operation.incremental.collect=true;
++---------------------------------------+--------+
+| key | value |
++---------------------------------------+--------+
+| kyuubi.operation.incremental.collect | true |
++---------------------------------------+--------+
+1 row selected (0.039 seconds)
+
+0: jdbc:kyuubi://localhost:10009/> select /*+ REPARTITION(5) */ * from
range(1, 10);
++-----+
+| id |
++-----+
+| 2 |
+| 6 |
+| 7 |
+| 0 |
+| 5 |
+| 3 |
+| 4 |
+| 1 |
+| 8 |
+| 9 |
++-----+
+10 rows selected (1.929 seconds)
+
+0: jdbc:kyuubi://localhost:10009/> set
kyuubi.operation.incremental.collect=false;
++---------------------------------------+--------+
+| key | value |
++---------------------------------------+--------+
+| kyuubi.operation.incremental.collect | false |
++---------------------------------------+--------+
+1 row selected (0.027 seconds)
+
+0: jdbc:kyuubi://localhost:10009/> select /*+ REPARTITION(5) */ * from
range(1, 10);
++-----+
+| id |
++-----+
+| 2 |
+| 6 |
+| 7 |
+| 0 |
+| 5 |
+| 3 |
+| 4 |
+| 1 |
+| 8 |
+| 9 |
++-----+
+10 rows selected (0.128 seconds)
+```
+
+On the Spark UI, you can see that in incremental collect mode, the query
produces 5 jobs (in red square), and in
+full collect mode, only produces 1 job (in blue square).
+
+
+
+## Arrow serialization
+
+Since v1.7.0, Kyuubi supports an optimized query results serialization format
that uses [Apache Arrow](https://arrow.apache.org/).
+This feature is disabled by default, you can turn on it by setting
`kyuubi.operation.result.format` to `arrow`.
+
+The following diagram shows the data flow of the query results serialization
in `thrift` and `arrow` formats.
+
+Thrift serialization: Spark driver collects the result set from executor in
Spark `InternalRow` format, then
+converts it to Spark `Row` format, and then converts it to Thrift `TRowSet`
format, this causes both high CPU
+and memory load on the Spark driver. The result set is sent to the client over
the Kyuubi Server in Thrift
+`TRowSet` format, and finally convert to the Java types on the client.
+
+```
+Spark Executor ==> Spark Driver ==> Kyuubi Server ==> Client
+ (Spark InternalRow)
+ (Spark Row)
+ (Thrift TRowSet)
+ (Thrift TRowSet)
+ (Java Types)
+```
+
+Arrow serialization: Spark executors convert the result set from Spark
`InternalRow` format to Arrow `RecordBatch`,
+then send Arrow `RecordBatch` to the Spark driver, then the result set is sent
to the client over the Kyuubi Server in
+Arrow `RecordBatch` format without conversion, and finally convert to the Java
types on the client.
+
+The main benefits of Arrow serialization are:
+
+1. the format conversion happens on the executors in parallel
+2. no format conversion happens on the Spark driver
+3. less time of format conversion that thrift serialization
+
+```
+Spark Executor ==> Spark Driver ==> Kyuubi Server ==> Client
+(Spark InternalRow)
+ (Arrow RecordBatch)
+ (Arrow RecordBatch)
+ (Arrow RecordBatch)
+ (Java Types)
+```
+
+Arrow serialization is orthogonal with incremental collect, you can combine
these two feature to achieve lower
+memory consumption and better query results retrieval performance.
+
+```
+kyuubi-beeline -u 'jdbc:kyuubi://kyuubi:10009/' \
+ --hiveconf kyuubi.operation.result.format=arrow \
+ --hiveconf kyuubi.operation.incremental.collect=true \
+ -f large_result_query.sql
+```
+
+Note: Arrow serialization requires the client to support deserializing Arrow
format results, you must use Kyuubi
+JDBC driver v1.7.0 or higher, third-party clients like Apache Hive JDBC
driver, PyHive does not work.
+
+## Sava query results into files
+
+Since v1.9.0, Kyuubi Spark SQL engine supports saving the query results into
files instead of collecting to the
+Spark driver memory before retrieving to client. This dramatically reduces the
memory pressure of the Spark driver,
+while no changes are required on the client side, that means all existing
clients could benefit from this feature.
+
+This feature is disabled by default, you can turn on it by setting
`kyuubi.operation.result.saveToFile.enabled`
+to `true`.
+
+Saving the query result into files requires a directory to store the query
result files, before enabling this feature,
+you must set a temporary directory by configuring
`kyuubi.operation.result.saveToFile.dir` for staging files,
+it is typically an HDFS directory or S3 bucket, both the Spark driver and
executor should be granted write and read permissions.
+Normally, each query result will be cleaned up after the query operation is
closed, for abnormally exited Spark applications,
+you may need to manually clean up or leverage storage system features like TTL
to handle leftover files.
+
+For small query results(lower than
`kyuubi.operation.result.saveToFile.minSize`, default is 200MiB), saving them
+as files may not help, Kyuubi collects the result to the Spark driver memory
directly. Note that, Kyuubi
+uses Spark internal methods to estimate query result size, the estimated size
sometimes is inaccurate.
+
+```
+kyuubi-beeline -u 'jdbc:kyuubi://kyuubi:10009/' \
+ --hiveconf kyuubi.operation.result.saveToFile.enabled=true \
+ -f large_result_query.sql
+```
+
+This feature is conflict with the above two features, please don't enable them
at the same time.