This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6b86244834fc [SPARK-52078][TEST] Add ZStandardTPCDSDataBenchmark
6b86244834fc is described below
commit 6b86244834fcc589aac60260beb10061b744831a
Author: Cheng Pan <[email protected]>
AuthorDate: Thu May 15 13:18:16 2025 +0800
[SPARK-52078][TEST] Add ZStandardTPCDSDataBenchmark
### What changes were proposed in this pull request?
We found some unreasonable benchmark results during upgrading zstd-jni from
1.5.6-10 to 1.5.7-x in https://github.com/apache/spark/pull/50057, and the
author suggests using real-world data for zstd compression benchmark.
### Why are the changes needed?
Add a new benchmark for zstd with more reasonable data.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tested on a local machine, Ubuntu 24.04, Intel(R) Core(TM) i5-9500 CPU
3.00GHz
zstd-jni:1.5.6-10
```
================================================================================================
Benchmark ZStandardCompressionCodec
================================================================================================
OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic
Intel(R) Core(TM) i5-9500 CPU 3.00GHz
Benchmark ZStandardCompressionCodec: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
----------------------------------------------------------------------------------------------------------------------------------
Compression 4 times at level 1 without buffer pool 2737
2742 6 0.0 684299199.3 1.0X
Compression 4 times at level 2 without buffer pool 4217
4218 2 0.0 1054165072.5 0.6X
Compression 4 times at level 3 without buffer pool 5660
5661 2 0.0 1414928809.8 0.5X
Compression 4 times at level 1 with buffer pool 2739
2743 6 0.0 684719746.2 1.0X
Compression 4 times at level 2 with buffer pool 4186
4191 8 0.0 1046477235.5 0.7X
Compression 4 times at level 3 with buffer pool 5663
5667 5 0.0 1415762083.2 0.5X
OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic
Intel(R) Core(TM) i5-9500 CPU 3.00GHz
Benchmark ZStandardCompressionCodec: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------------------
Decompression 4 times from level 1 without buffer pool 943
950 10 0.0 235749387.0 1.0X
Decompression 4 times from level 2 without buffer pool 1239
1244 6 0.0 309753079.0 0.8X
Decompression 4 times from level 3 without buffer pool 1468
1484 23 0.0 366946390.8 0.6X
Decompression 4 times from level 1 with buffer pool 933
942 9 0.0 233286880.8 1.0X
Decompression 4 times from level 2 with buffer pool 1142
1171 40 0.0 285605190.0 0.8X
Decompression 4 times from level 3 with buffer pool 1394
1404 13 0.0 348546518.3 0.7X
OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic
Intel(R) Core(TM) i5-9500 CPU 3.00GHz
Parallel Compression at level 3: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Parallel Compression with 0 workers 1889 1899
14 0.0 472156817.0 1.0X
Parallel Compression with 1 workers 1715 1717
2 0.0 428826617.0 1.1X
Parallel Compression with 2 workers 904 906
2 0.0 225890052.0 2.1X
Parallel Compression with 4 workers 539 548
8 0.0 134735732.5 3.5X
Parallel Compression with 8 workers 540 548
9 0.0 134889447.5 3.5X
Parallel Compression with 16 workers 577 589
23 0.0 144182540.7 3.3X
OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic
Intel(R) Core(TM) i5-9500 CPU 3.00GHz
Parallel Compression at level 9: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Parallel Compression with 0 workers 9555 9567
18 0.0 2388642623.3 1.0X
Parallel Compression with 1 workers 7973 8006
47 0.0 1993145509.0 1.2X
Parallel Compression with 2 workers 5070 5071
1 0.0 1267405763.3 1.9X
Parallel Compression with 4 workers 4420 4421
1 0.0 1104977620.3 2.2X
Parallel Compression with 8 workers 4790 4800
15 0.0 1197417939.0 2.0X
Parallel Compression with 16 workers 5000 5003
5 0.0 1249965510.5 1.9X
```
zstd-jni:1.5.7-3
```
================================================================================================
Benchmark ZStandardCompressionCodec
================================================================================================
OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic
Intel(R) Core(TM) i5-9500 CPU 3.00GHz
Benchmark ZStandardCompressionCodec: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
----------------------------------------------------------------------------------------------------------------------------------
Compression 4 times at level 1 without buffer pool 2700
2709 13 0.0 674967564.0 1.0X
Compression 4 times at level 2 without buffer pool 4148
4149 0 0.0 1037124857.0 0.7X
Compression 4 times at level 3 without buffer pool 5660
5682 31 0.0 1414968620.0 0.5X
Compression 4 times at level 1 with buffer pool 2718
2728 14 0.0 679514554.3 1.0X
Compression 4 times at level 2 with buffer pool 4130
4131 2 0.0 1032476406.2 0.7X
Compression 4 times at level 3 with buffer pool 5571
5576 6 0.0 1392871057.5 0.5X
OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic
Intel(R) Core(TM) i5-9500 CPU 3.00GHz
Benchmark ZStandardCompressionCodec: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------------------
Decompression 4 times from level 1 without buffer pool 942
951 9 0.0 235523684.5 1.0X
Decompression 4 times from level 2 without buffer pool 1248
1270 31 0.0 311906360.5 0.8X
Decompression 4 times from level 3 without buffer pool 1472
1475 4 0.0 368071680.5 0.6X
Decompression 4 times from level 1 with buffer pool 939
956 18 0.0 234631810.0 1.0X
Decompression 4 times from level 2 with buffer pool 1249
1261 16 0.0 312318610.5 0.8X
Decompression 4 times from level 3 with buffer pool 1475
1475 0 0.0 368765939.3 0.6X
OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic
Intel(R) Core(TM) i5-9500 CPU 3.00GHz
Parallel Compression at level 3: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Parallel Compression with 0 workers 1865 1873
11 0.0 466278397.5 1.0X
Parallel Compression with 1 workers 1785 1793
10 0.0 446359936.8 1.0X
Parallel Compression with 2 workers 945 953
10 0.0 236142005.8 2.0X
Parallel Compression with 4 workers 559 577
29 0.0 139754505.5 3.3X
Parallel Compression with 8 workers 537 555
13 0.0 134328778.3 3.5X
Parallel Compression with 16 workers 587 614
23 0.0 146784965.5 3.2X
OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic
Intel(R) Core(TM) i5-9500 CPU 3.00GHz
Parallel Compression at level 9: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Parallel Compression with 0 workers 9365 9375
14 0.0 2341247379.0 1.0X
Parallel Compression with 1 workers 8022 8022
0 0.0 2005448255.8 1.2X
Parallel Compression with 2 workers 5054 5069
22 0.0 1263445148.8 1.9X
Parallel Compression with 4 workers 4372 4394
31 0.0 1092926980.8 2.1X
Parallel Compression with 8 workers 4785 4805
28 0.0 1196282275.0 2.0X
Parallel Compression with 16 workers 5012 5028
23 0.0 1252925049.5 1.9X
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #50857 from pan3793/SPARK-52078.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.github/workflows/benchmark.yml | 21 ++--
.../ZStandardTPCDSDataBenchmark-jdk21-results.txt | 49 ++++++++
.../ZStandardTPCDSDataBenchmark-results.txt | 49 ++++++++
.../spark/io/ZStandardTPCDSDataBenchmark.scala | 123 +++++++++++++++++++++
4 files changed, 235 insertions(+), 7 deletions(-)
diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml
index 9bfe79cfa2fe..6b2e72b3f23b 100644
--- a/.github/workflows/benchmark.yml
+++ b/.github/workflows/benchmark.yml
@@ -66,8 +66,8 @@ jobs:
# Any TPC-DS related updates on this job need to be applied to tpcds-1g job
of build_and_test.yml as well
tpcds-1g-gen:
- name: "Generate an input dataset for TPCDSQueryBenchmark with SF=1"
- if: contains(inputs.class, 'TPCDSQueryBenchmark') ||
contains(inputs.class, '*')
+ name: "Generate an TPC-DS dataset with SF=1"
+ if: contains(inputs.class, 'TPCDSQueryBenchmark') ||
contains(inputs.class, 'ZStandardTPCDSDataBenchmark') || contains(inputs.class,
'*')
runs-on: ubuntu-latest
env:
SPARK_LOCAL_IP: localhost
@@ -98,7 +98,9 @@ jobs:
id: cache-tpcds-sf-1
uses: actions/cache@v4
with:
- path: ./tpcds-sf-1
+ path: |
+ ./tpcds-sf-1
+ ./tpcds-sf-1-text
key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml',
'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }}
- name: Checkout tpcds-kit repository
if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true'
@@ -118,7 +120,9 @@ jobs:
java-version: ${{ inputs.jdk }}
- name: Generate TPC-DS (SF=1) table data
if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true'
- run: build/sbt "sql/Test/runMain org.apache.spark.sql.GenTPCDSData
--dsdgenDir `pwd`/tpcds-kit/tools --location `pwd`/tpcds-sf-1 --scaleFactor 1
--numPartitions 1 --overwrite"
+ run: |
+ build/sbt "sql/Test/runMain org.apache.spark.sql.GenTPCDSData
--dsdgenDir `pwd`/tpcds-kit/tools --location `pwd`/tpcds-sf-1 --scaleFactor 1
--numPartitions 1 --overwrite"
+ mkdir -p `pwd`/tpcds-sf-1-text && rm -f `pwd`/tpcds-sf-1-text/* &&
`pwd`/tpcds-kit/tools/dsdgen -DISTRIBUTIONS `pwd`/tpcds-kit/tools/tpcds.idx
-SCALE 1 -DIR `pwd`/tpcds-sf-1-text
benchmark:
name: "Run benchmarks: ${{ inputs.class }} (JDK ${{ inputs.jdk }}, Scala
${{ inputs.scala }}, ${{ matrix.split }} out of ${{ inputs.num-splits }}
splits)"
@@ -138,6 +142,7 @@ jobs:
# To prevent spark.test.home not being set. See more detail in
SPARK-36007.
SPARK_HOME: ${{ github.workspace }}
SPARK_TPCDS_DATA: ${{ github.workspace }}/tpcds-sf-1
+ SPARK_TPCDS_DATA_TEXT: ${{ github.workspace }}/tpcds-sf-1-text
steps:
- name: Checkout Spark repository
uses: actions/checkout@v4
@@ -167,11 +172,13 @@ jobs:
distribution: zulu
java-version: ${{ inputs.jdk }}
- name: Cache TPC-DS generated data
- if: contains(inputs.class, 'TPCDSQueryBenchmark') ||
contains(inputs.class, '*')
+ if: contains(inputs.class, 'TPCDSQueryBenchmark') ||
contains(inputs.class, 'ZStandardTPCDSDataBenchmark') || contains(inputs.class,
'*')
id: cache-tpcds-sf-1
uses: actions/cache@v4
with:
- path: ./tpcds-sf-1
+ path: |
+ ./tpcds-sf-1
+ ./tpcds-sf-1-text
key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml',
'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }}
- name: Run benchmarks
run: |
@@ -188,7 +195,7 @@ jobs:
# To keep the directory structure and file permissions, tar them
# See also
https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files
echo "Preparing the benchmark results:"
- tar -cvf benchmark-results-${{ inputs.jdk }}-${{ inputs.scala }}.tar
`git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1
--exclude-standard`
+ tar -cvf benchmark-results-${{ inputs.jdk }}-${{ inputs.scala }}.tar
`git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1
--exclude=tpcds-sf-1-text --exclude-standard`
- name: Upload benchmark results
uses: actions/upload-artifact@v4
with:
diff --git a/core/benchmarks/ZStandardTPCDSDataBenchmark-jdk21-results.txt
b/core/benchmarks/ZStandardTPCDSDataBenchmark-jdk21-results.txt
new file mode 100644
index 000000000000..49606c5ab280
--- /dev/null
+++ b/core/benchmarks/ZStandardTPCDSDataBenchmark-jdk21-results.txt
@@ -0,0 +1,49 @@
+================================================================================================
+Benchmark ZStandardCompressionCodec
+================================================================================================
+
+OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure
+AMD EPYC 7763 64-Core Processor
+Benchmark ZStandardCompressionCodec: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+----------------------------------------------------------------------------------------------------------------------------------
+Compression 4 times at level 1 without buffer pool 2539
2541 2 0.0 634832028.5 1.0X
+Compression 4 times at level 2 without buffer pool 4157
4188 44 0.0 1039277864.3 0.6X
+Compression 4 times at level 3 without buffer pool 6091
6095 5 0.0 1522781623.3 0.4X
+Compression 4 times at level 1 with buffer pool 2536
2540 5 0.0 634097186.3 1.0X
+Compression 4 times at level 2 with buffer pool 4147
4150 4 0.0 1036639857.0 0.6X
+Compression 4 times at level 3 with buffer pool 6097
6099 3 0.0 1524134426.0 0.4X
+
+OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure
+AMD EPYC 7763 64-Core Processor
+Benchmark ZStandardCompressionCodec: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+--------------------------------------------------------------------------------------------------------------------------------------
+Decompression 4 times from level 1 without buffer pool 886
902 23 0.0 221484611.2 1.0X
+Decompression 4 times from level 2 without buffer pool 1109
1130 30 0.0 277257788.3 0.8X
+Decompression 4 times from level 3 without buffer pool 1336
1359 32 0.0 334102921.8 0.7X
+Decompression 4 times from level 1 with buffer pool 858
868 9 0.0 214401966.0 1.0X
+Decompression 4 times from level 2 with buffer pool 1131
1140 12 0.0 282739707.3 0.8X
+Decompression 4 times from level 3 with buffer pool 1366
1375 12 0.0 341571527.0 0.6X
+
+OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure
+AMD EPYC 7763 64-Core Processor
+Parallel Compression at level 3: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+Parallel Compression with 0 workers 2030 2033
4 0.0 507451934.3 1.0X
+Parallel Compression with 1 workers 1879 1882
4 0.0 469750208.3 1.1X
+Parallel Compression with 2 workers 969 976
10 0.0 242174332.5 2.1X
+Parallel Compression with 4 workers 711 713
2 0.0 177820489.8 2.9X
+Parallel Compression with 8 workers 847 898
53 0.0 211649152.3 2.4X
+Parallel Compression with 16 workers 848 859
10 0.0 211876140.0 2.4X
+
+OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure
+AMD EPYC 7763 64-Core Processor
+Parallel Compression at level 9: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+Parallel Compression with 0 workers 8266 8278
16 0.0 2066565583.8 1.0X
+Parallel Compression with 1 workers 6933 6941
10 0.0 1733356075.3 1.2X
+Parallel Compression with 2 workers 3690 3691
1 0.0 922481882.3 2.2X
+Parallel Compression with 4 workers 3223 3231
11 0.0 805643345.5 2.6X
+Parallel Compression with 8 workers 3652 3656
7 0.0 912916115.3 2.3X
+Parallel Compression with 16 workers 3912 3950
54 0.0 977901486.2 2.1X
+
+
diff --git a/core/benchmarks/ZStandardTPCDSDataBenchmark-results.txt
b/core/benchmarks/ZStandardTPCDSDataBenchmark-results.txt
new file mode 100644
index 000000000000..d5091826fa9a
--- /dev/null
+++ b/core/benchmarks/ZStandardTPCDSDataBenchmark-results.txt
@@ -0,0 +1,49 @@
+================================================================================================
+Benchmark ZStandardCompressionCodec
+================================================================================================
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure
+AMD EPYC 7763 64-Core Processor
+Benchmark ZStandardCompressionCodec: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+----------------------------------------------------------------------------------------------------------------------------------
+Compression 4 times at level 1 without buffer pool 2518
2519 1 0.0 629582183.5 1.0X
+Compression 4 times at level 2 without buffer pool 4111
4111 1 0.0 1027767031.5 0.6X
+Compression 4 times at level 3 without buffer pool 6146
6160 19 0.0 1536532700.3 0.4X
+Compression 4 times at level 1 with buffer pool 2517
2517 1 0.0 629208370.5 1.0X
+Compression 4 times at level 2 with buffer pool 4105
4112 11 0.0 1026190298.0 0.6X
+Compression 4 times at level 3 with buffer pool 6154
6157 5 0.0 1538378430.0 0.4X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure
+AMD EPYC 7763 64-Core Processor
+Benchmark ZStandardCompressionCodec: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+--------------------------------------------------------------------------------------------------------------------------------------
+Decompression 4 times from level 1 without buffer pool 900
903 4 0.0 225055920.0 1.0X
+Decompression 4 times from level 2 without buffer pool 1161
1163 3 0.0 290146657.0 0.8X
+Decompression 4 times from level 3 without buffer pool 1399
1406 10 0.0 349650877.8 0.6X
+Decompression 4 times from level 1 with buffer pool 899
901 2 0.0 224627803.0 1.0X
+Decompression 4 times from level 2 with buffer pool 1165
1166 1 0.0 291335735.3 0.8X
+Decompression 4 times from level 3 with buffer pool 1398
1401 4 0.0 349578394.0 0.6X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure
+AMD EPYC 7763 64-Core Processor
+Parallel Compression at level 3: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+Parallel Compression with 0 workers 2061 2067
8 0.0 515297811.0 1.0X
+Parallel Compression with 1 workers 1843 1844
1 0.0 460705797.3 1.1X
+Parallel Compression with 2 workers 961 972
16 0.0 240177085.3 2.1X
+Parallel Compression with 4 workers 729 731
2 0.0 182208026.2 2.8X
+Parallel Compression with 8 workers 781 800
18 0.0 195212932.0 2.6X
+Parallel Compression with 16 workers 865 871
6 0.0 216145271.5 2.4X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure
+AMD EPYC 7763 64-Core Processor
+Parallel Compression at level 9: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+Parallel Compression with 0 workers 8557 8635
110 0.0 2139353975.8 1.0X
+Parallel Compression with 1 workers 7156 7193
52 0.0 1789023949.5 1.2X
+Parallel Compression with 2 workers 3855 3861
9 0.0 963635046.3 2.2X
+Parallel Compression with 4 workers 3248 3253
8 0.0 811889324.8 2.6X
+Parallel Compression with 8 workers 3667 3671
6 0.0 916671282.5 2.3X
+Parallel Compression with 16 workers 3799 3845
65 0.0 949757174.5 2.3X
+
+
diff --git
a/core/src/test/scala/org/apache/spark/io/ZStandardTPCDSDataBenchmark.scala
b/core/src/test/scala/org/apache/spark/io/ZStandardTPCDSDataBenchmark.scala
new file mode 100644
index 000000000000..69820c15be30
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/io/ZStandardTPCDSDataBenchmark.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.io
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream,
ObjectOutputStream, OutputStream}
+import java.nio.file.{Files, Paths}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import
org.apache.spark.internal.config.{IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED,
IO_COMPRESSION_ZSTD_LEVEL, IO_COMPRESSION_ZSTD_WORKERS}
+
+/**
+ * Benchmark for ZStandard codec performance.
+ * {{{
+ * To run this benchmark:
+ * 1. without sbt: bin/spark-submit --class <this class> <spark core test
jar>
+ * 2. build/sbt "core/Test/runMain <this class>"
+ * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt
"core/Test/runMain <this class>"
+ * Results will be written to
"benchmarks/ZStandardTPCDSDataBenchmark-results.txt".
+ * }}}
+ */
+object ZStandardTPCDSDataBenchmark extends BenchmarkBase {
+
+ val N = 4
+
+ // the size of TPCDS catalog_sales.dat (SF1) is about 283M
+ val data = Files.readAllBytes(Paths.get(sys.env("SPARK_TPCDS_DATA_TEXT"),
"catalog_sales.dat"))
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ val name = "Benchmark ZStandardCompressionCodec"
+ runBenchmark(name) {
+ val benchmark1 = new Benchmark(name, N, output = output)
+ compressionBenchmark(benchmark1, N)
+ benchmark1.run()
+
+ val benchmark2 = new Benchmark(name, N, output = output)
+ decompressionBenchmark(benchmark2, N)
+ benchmark2.run()
+ parallelCompressionBenchmark()
+ }
+ }
+
+ private def compressionBenchmark(benchmark: Benchmark, N: Int): Unit = {
+ Seq(false, true).foreach { enablePool =>
+ Seq(1, 2, 3).foreach { level =>
+ val conf = new SparkConf(false)
+ .set(IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, enablePool)
+ .set(IO_COMPRESSION_ZSTD_LEVEL, level)
+ val condition = if (enablePool) "with" else "without"
+ benchmark.addCase(s"Compression $N times at level $level $condition
buffer pool") { _ =>
+ (1 until N).foreach { _ =>
+ val os = new ZStdCompressionCodec(conf)
+ .compressedOutputStream(OutputStream.nullOutputStream())
+ os.write(data)
+ os.close()
+ }
+ }
+ }
+ }
+ }
+
+ private def decompressionBenchmark(benchmark: Benchmark, N: Int): Unit = {
+ Seq(false, true).foreach { enablePool =>
+ Seq(1, 2, 3).foreach { level =>
+ val conf = new SparkConf(false)
+ .set(IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, enablePool)
+ .set(IO_COMPRESSION_ZSTD_LEVEL, level)
+ val outputStream = new ByteArrayOutputStream()
+ val out = new
ZStdCompressionCodec(conf).compressedOutputStream(outputStream)
+ out.write(data)
+ out.close()
+ val bytes = outputStream.toByteArray
+
+ val condition = if (enablePool) "with" else "without"
+ benchmark.addCase(s"Decompression $N times from level $level
$condition buffer pool") { _ =>
+ (1 until N).foreach { _ =>
+ val bais = new ByteArrayInputStream(bytes)
+ val is = new ZStdCompressionCodec(conf).compressedInputStream(bais)
+ is.readAllBytes()
+ is.close()
+ }
+ }
+ }
+ }
+ }
+
+ private def parallelCompressionBenchmark(): Unit = {
+ Seq(3, 9).foreach { level =>
+ val benchmark = new Benchmark(
+ s"Parallel Compression at level $level", N, output = output)
+ Seq(0, 1, 2, 4, 8, 16).foreach { workers =>
+ val conf = new SparkConf(false)
+ .set(IO_COMPRESSION_ZSTD_LEVEL, level)
+ .set(IO_COMPRESSION_ZSTD_WORKERS, workers)
+ benchmark.addCase(s"Parallel Compression with $workers workers") { _ =>
+ val os = OutputStream.nullOutputStream()
+ val zcos = new ZStdCompressionCodec(conf).compressedOutputStream(os)
+ val oos = new ObjectOutputStream(zcos)
+ 1 to N foreach { _ =>
+ oos.writeObject(data)
+ }
+ oos.close()
+ }
+ }
+ benchmark.run()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]