aokolnychyi opened a new pull request #105: Basic Benchmarks for Iceberg Spark Data Source URL: https://github.com/apache/incubator-iceberg/pull/105 **_Note!_** This PR is based on the [change](https://github.com/apache/incubator-iceberg/pull/63) that is not merged yet. **_Note!_** The results were measured at commit 65ffbc4. This PR contains benchmarks for the Iceberg Spark data source. The main goal is to compare its performance to the built-in file source in Spark. ## Results ### Full Materialized Scans This is the worst case for Iceberg as it cannot use any indexing capabilities. #### Flat Data First, we evaluate a data set with a flat schema. See `SparkParquetFlatDataBenchmark` for its schema. ``` Benchmark Mode Cnt Score Error Units SparkParquetFlatDataReadBenchmark.readFileSourceNonVectorized ss 5 6.524 ± 0.216 s/op SparkParquetFlatDataReadBenchmark.readFileSourceVectorized ss 5 2.673 ± 0.134 s/op SparkParquetFlatDataReadBenchmark.readIceberg ss 5 6.282 ± 0.287 s/op ``` As we see, Iceberg might be more than 2 times slower in this case. The main reason is the absence of vectorized execution. If we disable it, Iceberg slightly outperforms the built-in file source (Iceberg classes are used to read Parquet). If we add a projection, we get the following results: ``` Benchmark Mode Cnt Score Error Units SparkParquetFlatDataReadBenchmark.readWithProjectionFileSourceNonVectorized ss 5 1.080 ± 0.077 s/op SparkParquetFlatDataReadBenchmark.readWithProjectionFileSourceVectorized ss 5 0.401 ± 0.088 s/op SparkParquetFlatDataReadBenchmark.readWithProjectionIceberg ss 5 1.181 ± 0.032 s/op ``` #### Nested Data Now we evaluate a data set with nested data. See `SparkParquetNestedDataBenchmark` for its schema. ``` Benchmark Mode Cnt Score Error Units SparkParquetNestedDataReadBenchmark.readFileSourceNonVectorized ss 5 4.038 ± 0.163 s/op SparkParquetNestedDataReadBenchmark.readFileSourceVectorized ss 5 4.176 ± 0.120 s/op SparkParquetNestedDataReadBenchmark.readIceberg ss 5 2.869 ± 0.141 s/op ``` Iceberg performs better on nested data. The built-in file source in Spark does not support vectorized execution for nested data. Moreover, there is an issue in Spark 2.4.0. If we don't disable vectorized execution, the generated code will be more complicated and Spark might have to reconstruct each row (try yourself for more details). I did not manage to reproduce this issue on master. I believe [this Spark change](https://github.com/apache/spark/pull/23127/files) could fix it. If we add a projection on a nested column, we get the following results. ``` Benchmark Mode Cnt Score Error Units SparkParquetNestedDataReadBenchmark.readWithProjectionFileSourceNonVectorized ss 5 1.321 ± 0.084 s/op SparkParquetNestedDataReadBenchmark.readWithProjectionFileSourceVectorized ss 5 1.341 ± 0.045 s/op SparkParquetNestedDataReadBenchmark.readWithProjectionIceberg ss 5 2.351 ± 0.146 s/op ``` Unfortunately, Spark does not request Iceberg to prune the schema only to the nested field. This feature is quite recent in Spark. It is available only for Parquet and is disabled by default. This is not a problem on the Iceberg side. It is a problem inside Spark that it doesn't request a properly pruned schema for arbitrary data sources. ### File Skipping Capabilities This benchmark assumes having 500 files with 10000 records. Each file corresponds to a particular date. Then you have a query that is interested in 1 file out of 500. The goal is to evaluate the file skipping capabilities in Iceberg. #### Flat Data ``` Benchmark Mode Cnt Score Error Units SparkParquetFlatDataFilterBenchmark.readWithFilterFileSourceNonVectorized ss 5 1.590 ± 0.338 s/op SparkParquetFlatDataFilterBenchmark.readWithFilterFileSourceVectorized ss 5 1.541 ± 0.251 s/op SparkParquetFlatDataFilterBenchmark.readWithFilterIceberg ss 5 0.067 ± 0.007 s/op ``` To be fair, the difference is so big because our configuration forces Spark to process only one file per task. Hence, we have only 1 task with Iceberg compared to 500 tasks in the file source. Another reason is that we do not have to read 499 files. #### Nested Data ``` Benchmark Mode Cnt Score Error Units SparkParquetNestedDataFilterBenchmark.readWithFilterFileSourceNonVectorized ss 5 3.396 ± 0.387 s/op SparkParquetNestedDataFilterBenchmark.readWithFilterFileSourceVectorized ss 5 3.416 ± 0.351 s/op SparkParquetNestedDataFilterBenchmark.readWithFilterIceberg ss 5 5.634 ± 0.397 s/op ``` As of now, Iceberg does not compute min/max values for nested columns. Consequently, it cannot skip files based on predicates for nested fields. This is tracked in [this issue](https://github.com/apache/incubator-iceberg/issues/78). TODO: why Iceberg is still slower than the file source? The num of records is small, so the performance difference, most likely, comes from somewhere else. ### Writing Performance This benchmark tests the performance of writing 5000000 records in Parquet format with gzip compression. #### Flat Data ``` Benchmark Mode Cnt Score Error Units SparkParquetFlatDataWriteBenchmark.writeFileSource ss 5 16.313 ± 0.570 s/op SparkParquetFlatDataWriteBenchmark.writeIceberg ss 5 16.115 ± 0.382 s/op ``` The performance is almost identical on flat data. #### Nested Data ``` Benchmark Mode Cnt Score Error Units SparkParquetNestedDataWriteBenchmark.writeFileSource ss 5 8.702 ± 0.431 s/op SparkParquetNestedDataWriteBenchmark.writeIceberg ss 5 8.331 ± 0.231 s/op ``` As before, Iceberg outperforms the built-in file source on nested data. ## Open Questions ### Ways to run [The recommended way](https://openjdk.java.net/projects/code-tools/jmh/) to run JMH benchmarks is via Maven or Gradle. However, we can still try integrating into JUnit if needed. ### JMH plugin config One option is to have a separate JMH config in each `subproject` inside `build.gradle`. As the configuration is identical, I decided to use an if statement in `subprojects`. The if condition will be extended with `iceberg-hive` and `iceberg-core` for benchmarks that cover `HiveTables` and `HadoopTables`. ### Configuring output path & include regex Right now, it is done via `gradle.properties`. If there are any objections, we can migrate this logic directly into `build.gradle`. ### Represent `=> Unit` in Java I did not find a suitable built-in Java class to represent Scala `=> Unit`. Therefore, I created a functional interface called `Action`. Let me know if there is a better alternative. ### Materializing Datasets One way to materialize Datasets in Java: ``` ds.foreach(record -> {}); ``` This approach computes the underlying `rdd` as follows: ``` lazy val rdd: RDD[T] = { val objectType = exprEnc.deserializer.dataType rddQueryExecution.toRdd.mapPartitions { rows => rows.map(_.get(0, objectType).asInstanceOf[T]) } } ``` The current way looks a bit ugly: ``` ds.queryExecution().toRdd().toJavaRDD().foreach(record -> {}); ``` However, it should be more efficient. It computes the underlying `rdd` as follows: ``` /** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[InternalRow] = executedPlan.execute() ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
