[CARBONDATA-3074] Change default sort temp compressor to snappy sort temp compressor used to be set as empty, which means that Carbondata will not compress the sort temp files. This PR changes the default value to snappy. Some experiments in local cluster shows that setting the compressor âsnappyâ will slightly enhance the loading performance and reduce lots of disk IO during data loading.
This closes #2894 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6c9b3959 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6c9b3959 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6c9b3959 Branch: refs/heads/branch-1.5 Commit: 6c9b39595b769ba4bb4c1a759d32a7c9e4faf3f5 Parents: e0963c1 Author: Manhua <kevin...@qq.com> Authored: Mon Nov 5 17:28:07 2018 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Wed Nov 21 22:39:53 2018 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 6 +++--- docs/configuration-parameters.md | 3 +-- docs/performance-tuning.md | 2 +- .../dataload/TestLoadWithSortTempCompressed.scala | 16 ++++++++++++++++ 4 files changed, 21 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6c9b3959/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index bf4f7e5..9484bb4 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -873,10 +873,10 @@ public final class CarbonCommonConstants { public static final String CARBON_SORT_TEMP_COMPRESSOR = "carbon.sort.temp.compressor"; /** - * The optional values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD'. - * By default, empty means that Carbondata will not compress the sort temp files. + * The optional values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD' and empty. + * Specially, empty means that Carbondata will not compress the sort temp files. */ - public static final String CARBON_SORT_TEMP_COMPRESSOR_DEFAULT = ""; + public static final String CARBON_SORT_TEMP_COMPRESSOR_DEFAULT = "SNAPPY"; /** * Which storage level to persist rdd when sort_scope=global_sort */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/6c9b3959/docs/configuration-parameters.md ---------------------------------------------------------------------- diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md index 2a3748c..5a4dea6 100644 --- a/docs/configuration-parameters.md +++ b/docs/configuration-parameters.md @@ -79,12 +79,11 @@ This section provides the details of all the configurations required for the Car | enable.inmemory.merge.sort | false | CarbonData sorts and writes data to intermediate files to limit the memory usage. These intermediate files needs to be sorted again using merge sort before writing to the final carbondata file.Performing merge sort in memory would increase the sorting performance at the cost of increased memory footprint. This Configuration specifies to do in-memory merge sort or to do file based merge sort. | | carbon.sort.storage.inmemory.size.inmb | 512 | CarbonData writes every ***carbon.sort.size*** number of records to intermediate temp files during data loading to ensure memory footprint is within limits. When ***enable.unsafe.sort*** configuration is enabled, instead of using ***carbon.sort.size*** which is based on rows count, size occupied in memory is used to determine when to flush data pages to intermediate temp files. This configuration determines the memory to be used for storing data pages in memory. **NOTE:** Configuring a higher value ensures more data is maintained in memory and hence increases data loading performance due to reduced or no IO.Based on the memory availability in the nodes of the cluster, configure the values accordingly. | | carbon.load.sortmemory.spill.percentage | 0 | During data loading, some data pages are kept in memory upto memory configured in ***carbon.sort.storage.inmemory.size.inmb*** beyond which they are spilled to disk as intermediate temporary sort files. This configuration determines after what percentage data needs to be spilled to disk. **NOTE:** Without this configuration, when the data pages occupy upto configured memory, new data pages would be dumped to disk and old pages are still maintained in disk. | -| carbon.sort.temp.compressor | (none) | CarbonData writes every ***carbon.sort.size*** number of records to intermediate temp files during data loading to ensure memory footprint is within limits. These temporary files can be compressed and written in order to save the storage space. This configuration specifies the name of compressor to be used to compress the intermediate sort temp files during sort procedure in data loading. The valid values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD' and empty. By default, empty means that Carbondata will not compress the sort temp files. **NOTE:** Compressor will be useful if you encounter disk bottleneck.Since the data needs to be compressed and decompressed,it involves additional CPU cycles,but is compensated by the high IO throughput due to less data to be written or read from the disks. | | carbon.enable.calculate.size | true | **For Load Operation**: Enabling this property will let carbondata calculate the size of the carbon data file (.carbondata) and the carbon index file (.carbonindex) for each load and update the table status file. **For Describe Formatted**: Enabling this property will let carbondata calculate the total size of the carbon data files and the carbon index files for the each table and display it in describe formatted command. **NOTE:** This is useful to determine the overall size of the carbondata table and also get an idea of how the table is growing in order to take up other backup strategy decisions. | | carbon.cutOffTimestamp | (none) | CarbonData has capability to generate the Dictionary values for the timestamp columns from the data itself without the need to store the computed dictionary values. This configuration sets the start date for calculating the timestamp. Java counts the number of milliseconds from start of "1970-01-01 00:00:00". This property is used to customize the start of position. For example "2000-01-01 00:00:00". **NOTE:** The date must be in the form ***carbon.timestamp.format***. CarbonData supports storing data for upto 68 years.For example, if the cut-off time is 1970-01-01 05:30:00, then data upto 2038-01-01 05:30:00 will be supported by CarbonData. | | carbon.timegranularity | SECOND | The configuration is used to specify the data granularity level such as DAY, HOUR, MINUTE, or SECOND. This helps to store more than 68 years of data into CarbonData. | | carbon.use.local.dir | true | CarbonData,during data loading, writes files to local temp directories before copying the files to HDFS. This configuration is used to specify whether CarbonData can write locally to tmp directory of the container or to the YARN application directory. | -| carbon.sort.temp.compressor | (none) | CarbonData writes every ***carbon.sort.size*** number of records to intermediate temp files during data loading to ensure memory footprint is within limits. These temporary files can be compressed and written in order to save the storage space. This configuration specifies the name of compressor to be used to compress the intermediate sort temp files during sort procedure in data loading. The valid values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD' and empty. By default, empty means that Carbondata will not compress the sort temp files. **NOTE:** Compressor will be useful if you encounter disk bottleneck.Since the data needs to be compressed and decompressed,it involves additional CPU cycles,but is compensated by the high IO throughput due to less data to be written or read from the disks. | +| carbon.sort.temp.compressor | SNAPPY | CarbonData writes every ***carbon.sort.size*** number of records to intermediate temp files during data loading to ensure memory footprint is within limits. These temporary files can be compressed and written in order to save the storage space. This configuration specifies the name of compressor to be used to compress the intermediate sort temp files during sort procedure in data loading. The valid values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD' and empty. Specially, empty means that Carbondata will not compress the sort temp files. **NOTE:** Compressor will be useful if you encounter disk bottleneck.Since the data needs to be compressed and decompressed,it involves additional CPU cycles,but is compensated by the high IO throughput due to less data to be written or read from the disks. | | carbon.load.skewedDataOptimization.enabled | false | During data loading,CarbonData would divide the number of blocks equally so as to ensure all executors process same number of blocks. This mechanism satisfies most of the scenarios and ensures maximum parallel processing for optimal data loading performance.In some business scenarios, there might be scenarios where the size of blocks vary significantly and hence some executors would have to do more work if they get blocks containing more data. This configuration enables size based block allocation strategy for data loading. When loading, carbondata will use file size based block allocation strategy for task distribution. It will make sure that all the executors process the same size of data.**NOTE:** This configuration is useful if the size of your input data files varies widely, say 1MB to 1GB.For this configuration to work effectively,knowing the data pattern and size is important and necessary. | | enable.data.loading.statistics | false | CarbonData has extensive logging which would be useful for debugging issues related to performance or hard to locate issues. This configuration when made ***true*** would log additional data loading statistics information to more accurately locate the issues being debugged. **NOTE:** Enabling this would log more debug information to log files, there by increasing the log files size significantly in short span of time.It is advised to configure the log files size, retention of log files parameters in log4j properties appropriately. Also extensive logging is an increased IO operation and hence over all data loading performance might get reduced. Therefore it is recommended to enable this configuration only for the duration of debugging. | | carbon.dictionary.chunk.size | 10000 | CarbonData generates dictionary keys and writes them to separate dictionary file during data loading. To optimize the IO, this configuration determines the number of dictionary keys to be persisted to dictionary file at a time. **NOTE:** Writing to file also serves as a commit point to the dictionary generated.Increasing more values in memory causes more data loss during system or application failure.It is advised to alter this configuration judiciously. | http://git-wip-us.apache.org/repos/asf/carbondata/blob/6c9b3959/docs/performance-tuning.md ---------------------------------------------------------------------- diff --git a/docs/performance-tuning.md b/docs/performance-tuning.md index 64f80c4..7059605 100644 --- a/docs/performance-tuning.md +++ b/docs/performance-tuning.md @@ -170,7 +170,7 @@ | spark.executor.instances/spark.executor.cores/spark.executor.memory | spark/conf/spark-defaults.conf | Querying | The number of executors, CPU cores, and memory used for CarbonData query. | In the bank scenario, we provide the 4 CPUs cores and 15 GB for each executor which can get good performance. This 2 value does not mean more the better. It needs to be configured properly in case of limited resources. For example, In the bank scenario, it has enough CPU 32 cores each node but less memory 64 GB each node. So we cannot give more CPU but less memory. For example, when 4 cores and 12GB for each executor. It sometimes happens GC during the query which impact the query performance very much from the 3 second to more than 15 seconds. In this scenario need to increase the memory or decrease the CPU cores. | | carbon.detail.batch.size | spark/carbonlib/carbon.properties | Querying | The buffer size to store records, returned from the block scan. | In limit scenario this parameter is very important. For example your query limit is 1000. But if we set this value to 3000 that means we get 3000 records from scan but spark will only take 1000 rows. So the 2000 remaining are useless. In one Finance test case after we set it to 100, in the limit 1000 scenario the performance increase about 2 times in comparison to if we set this value to 12000. | | carbon.use.local.dir | spark/carbonlib/carbon.properties | Data loading | Whether use YARN local directories for multi-table load disk load balance | If this is set it to true CarbonData will use YARN local directories for multi-table load disk load balance, that will improve the data load performance. | -| carbon.sort.temp.compressor | spark/carbonlib/carbon.properties | Data loading | Specify the name of compressor to compress the intermediate sort temporary files during sort procedure in data loading. | The optional values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD', and empty. By default, empty means that Carbondata will not compress the sort temp files. This parameter will be useful if you encounter disk bottleneck. | +| carbon.sort.temp.compressor | spark/carbonlib/carbon.properties | Data loading | Specify the name of compressor to compress the intermediate sort temporary files during sort procedure in data loading. | The optional values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD', and empty. Specially, empty means that Carbondata will not compress the sort temp files. This parameter will be useful if you encounter disk bottleneck. | | carbon.load.skewedDataOptimization.enabled | spark/carbonlib/carbon.properties | Data loading | Whether to enable size based block allocation strategy for data loading. | When loading, carbondata will use file size based block allocation strategy for task distribution. It will make sure that all the executors process the same size of data -- It's useful if the size of your input data files varies widely, say 1MB to 1GB. | | carbon.load.min.size.enabled | spark/carbonlib/carbon.properties | Data loading | Whether to enable node minumun input data size allocation strategy for data loading.| When loading, carbondata will use node minumun input data size allocation strategy for task distribution. It will make sure the nodes load the minimum amount of data -- It's useful if the size of your input data files very small, say 1MB to 256MB,Avoid generating a large number of small files. | http://git-wip-us.apache.org/repos/asf/carbondata/blob/6c9b3959/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala index 5fbdd14..21affee 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala @@ -81,6 +81,22 @@ class TestLoadWithSortTempCompressed extends QueryTest checkAnswer(sql(s"select count(*) from $simpleTable where c5 > 5001"), Row(5001)) } + test("test data load for simple table without sort temp compressed and off-heap sort enabled") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR, "") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + testSimpleTable() + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, + originOffHeapStatus) + } + + test("test data load for simple table without sort temp compressed and off-heap sort disabled") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR, "") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false") + testSimpleTable() + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, + originOffHeapStatus) + } + test("test data load for simple table with sort temp compressed with snappy" + " and off-heap sort enabled") { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,