aokolnychyi opened a new pull request #105: Basic Benchmarks for Iceberg Spark 
Data Source
URL: https://github.com/apache/incubator-iceberg/pull/105
 
 
   **_Note!_** This PR is based on the 
[change](https://github.com/apache/incubator-iceberg/pull/63) that is not 
merged yet.
   **_Note!_** The results were measured at commit 65ffbc4.
   
   This PR contains benchmarks for the Iceberg Spark data source. The main goal 
is to compare its performance to the built-in file source in Spark.
   
   ## Results
   
   ### Full Materialized Scans
   
   This is the worst case for Iceberg as it cannot use any indexing 
capabilities.
   
   #### Flat Data 
   
   First, we evaluate a data set with a flat schema. See 
`SparkParquetFlatDataBenchmark` for its schema.
   
   ```
   Benchmark                                                                    
Mode  Cnt  Score   Error  Units
   SparkParquetFlatDataReadBenchmark.readFileSourceNonVectorized                
  ss    5  6.524 ± 0.216   s/op
   SparkParquetFlatDataReadBenchmark.readFileSourceVectorized                   
  ss    5  2.673 ± 0.134   s/op
   SparkParquetFlatDataReadBenchmark.readIceberg                                
  ss    5  6.282 ± 0.287   s/op
   ```
   
   As we see, Iceberg might be more than 2 times slower in this case. The main 
reason is the absence of vectorized execution. If we disable it, Iceberg 
slightly outperforms the built-in file source (Iceberg classes are used to read 
Parquet).
   
   If we add a projection, we get the following results:
   ```
   Benchmark                                                                    
Mode  Cnt  Score   Error  Units
   SparkParquetFlatDataReadBenchmark.readWithProjectionFileSourceNonVectorized  
  ss    5  1.080 ± 0.077   s/op
   SparkParquetFlatDataReadBenchmark.readWithProjectionFileSourceVectorized     
  ss    5  0.401 ± 0.088   s/op
   SparkParquetFlatDataReadBenchmark.readWithProjectionIceberg                  
  ss    5  1.181 ± 0.032   s/op
   ```
   
   #### Nested Data
   
   Now we evaluate a data set with nested data. See 
`SparkParquetNestedDataBenchmark` for its schema.
   
   ```
   Benchmark                                                                    
  Mode  Cnt  Score   Error  Units
   SparkParquetNestedDataReadBenchmark.readFileSourceNonVectorized              
    ss    5  4.038 ± 0.163   s/op
   SparkParquetNestedDataReadBenchmark.readFileSourceVectorized                 
    ss    5  4.176 ± 0.120   s/op
   SparkParquetNestedDataReadBenchmark.readIceberg                              
    ss    5  2.869 ± 0.141   s/op
   ```
   
   Iceberg performs better on nested data. The built-in file source in Spark 
does not support vectorized execution for nested data. Moreover, there is an 
issue in Spark 2.4.0. If we don't disable vectorized execution, the generated 
code will be more complicated and Spark might have to reconstruct each row (try 
yourself for more details). I did not manage to reproduce this issue on master. 
I believe ​[this​ Spark 
change](https://github.com/apache/spark/pull/23127/files) could fix it.
   
   If we add a projection on a nested column, we get the following results.
   
   ```
   Benchmark                                                                    
  Mode  Cnt  Score   Error  Units
   
SparkParquetNestedDataReadBenchmark.readWithProjectionFileSourceNonVectorized   
 ss    5  1.321 ± 0.084   s/op
   SparkParquetNestedDataReadBenchmark.readWithProjectionFileSourceVectorized   
    ss    5  1.341 ± 0.045   s/op
   SparkParquetNestedDataReadBenchmark.readWithProjectionIceberg                
    ss    5  2.351 ± 0.146   s/op
   ```
   
   Unfortunately, Spark does not request Iceberg to prune the schema only to 
the nested field. This feature is quite recent in Spark. It is available only 
for Parquet and is disabled by default. This is not a problem on the Iceberg 
side. It is a problem inside Spark that it doesn't request a properly pruned 
schema for arbitrary data sources.
   
   ### File Skipping Capabilities
   
   This benchmark assumes having 500 files with 10000 records. Each file 
corresponds to a particular date. Then you have a query that is interested in 1 
file out of 500. The goal is to evaluate the file skipping capabilities in 
Iceberg.
   
   #### Flat Data
   
   ```
   Benchmark                                                                  
Mode  Cnt  Score   Error  Units
   SparkParquetFlatDataFilterBenchmark.readWithFilterFileSourceNonVectorized    
ss    5  1.590 ± 0.338   s/op
   SparkParquetFlatDataFilterBenchmark.readWithFilterFileSourceVectorized       
ss    5  1.541 ± 0.251   s/op
   SparkParquetFlatDataFilterBenchmark.readWithFilterIceberg                    
ss    5  0.067 ± 0.007   s/op
   ```
   To be fair, the difference is so big because our configuration forces Spark 
to process only one file per task. Hence, we have only 1 task with Iceberg 
compared to 500 tasks in the file source. Another reason is that we do not have 
to read 499 files.
   
   
   #### Nested Data
   
   ```
   Benchmark                                                                    
Mode  Cnt  Score   Error  Units
   SparkParquetNestedDataFilterBenchmark.readWithFilterFileSourceNonVectorized  
  ss    5  3.396 ± 0.387   s/op
   SparkParquetNestedDataFilterBenchmark.readWithFilterFileSourceVectorized     
  ss    5  3.416 ± 0.351   s/op
   SparkParquetNestedDataFilterBenchmark.readWithFilterIceberg                  
  ss    5  5.634 ± 0.397   s/op
   ```
   
   As of now, Iceberg does not compute min/max values for nested columns. 
Consequently, it cannot skip files based on predicates for nested fields. This 
is tracked in [this 
issue](https://github.com/apache/incubator-iceberg/issues/78).
   
   TODO: why Iceberg is still slower than the file source? The num of records 
is small, so the performance difference, most likely, comes from somewhere else.
   
   ### Writing Performance
   
   This benchmark tests the performance of writing 5000000 records in Parquet 
format with gzip compression.
   
   #### Flat Data
   
   ```
   Benchmark                                           Mode  Cnt   Score   
Error  Units
   SparkParquetFlatDataWriteBenchmark.writeFileSource    ss    5  16.313 ± 
0.570   s/op
   SparkParquetFlatDataWriteBenchmark.writeIceberg       ss    5  16.115 ± 
0.382   s/op
   ```
   
   The performance is almost identical on flat data.
   
   #### Nested Data
   
   ```
   Benchmark                                             Mode  Cnt  Score   
Error  Units
   SparkParquetNestedDataWriteBenchmark.writeFileSource    ss    5  8.702 ± 
0.431   s/op
   SparkParquetNestedDataWriteBenchmark.writeIceberg       ss    5  8.331 ± 
0.231   s/op
   ```
   
   As before, Iceberg outperforms the built-in file source on nested data.
   
   ## Open Questions
   
   ### Ways to run
   
   [The recommended way](https://openjdk.java.net/projects/code-tools/jmh/) to 
run JMH benchmarks is via Maven or Gradle. However, we can still try 
integrating into JUnit if needed.
   
   ### JMH plugin config
   
   One option is to have a separate JMH config in each `subproject` inside 
`build.gradle`. As the configuration is identical, I decided to use an if 
statement in `subprojects`. The if condition will be extended with 
`iceberg-hive` and `iceberg-core` for benchmarks  that cover `HiveTables` and 
`HadoopTables`.
   
   ### Configuring output path & include regex
   
   Right now, it is done via `gradle.properties`. If there are any objections, 
we can migrate this logic directly into `build.gradle`.
   
   ### Represent `=> Unit` in Java
   
   I did not find a suitable built-in Java class to represent Scala `=> Unit`. 
Therefore, I created a functional interface called `Action`. Let me know if 
there is a better alternative.
   
   ### Materializing Datasets
   
   One way to materialize Datasets in Java:
   ```
   ds.foreach(record -> {});
   ```
   This approach computes the underlying `rdd` as follows:
   ```
   lazy val rdd: RDD[T] = {
     val objectType = exprEnc.deserializer.dataType
     rddQueryExecution.toRdd.mapPartitions { rows =>
       rows.map(_.get(0, objectType).asInstanceOf[T])
     }
   }
   ```
   The current way looks a bit ugly:
   ```
   ds.queryExecution().toRdd().toJavaRDD().foreach(record -> {});
   ```
   However, it should be more efficient. It computes the underlying `rdd` as 
follows:
   ```
   /** Internal version of the RDD. Avoids copies and has no schema */
   lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to