[CARBONDATA-1462]Add an option 'carbon.update.storage.level' to support configuring the storage level when updating data with 'carbon.update.persist.enable'='true'
When updating data with 'carbon.update.persist.enable'='true'(default), the storage level of dataset is 'MEMORY_AND_DISK', it should support configuring the storage level to correspond to different environment. This closes #1340 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0ab928e9 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0ab928e9 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0ab928e9 Branch: refs/heads/branch-1.2 Commit: 0ab928e9c1730d69a3fcd1805c26ef1200214fc9 Parents: 8b38e0b Author: Zhang Zhichao <441586...@qq.com> Authored: Fri Sep 8 13:27:42 2017 +0800 Committer: Ravindra Pesala <ravi.pes...@gmail.com> Committed: Mon Sep 11 20:33:57 2017 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 20 +++++++++++ .../carbondata/core/util/CarbonProperties.java | 36 ++++++++++++++++++++ .../sql/execution/command/IUDCommands.scala | 17 ++------- 3 files changed, 59 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/0ab928e9/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 6c116a7..5a68f60 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1091,6 +1091,26 @@ public final class CarbonCommonConstants { public static final String defaultValueIsPersistEnabled = "true"; /** + * Which storage level to persist dataset when updating data + * with 'carbon.update.persist.enable'='true' + */ + @CarbonProperty + public static final String CARBON_UPDATE_STORAGE_LEVEL = + "carbon.update.storage.level"; + + /** + * The default value(MEMORY_AND_DISK) is the same as the default storage level of Dataset. + * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because + * recomputing the in-memory columnar representation of the underlying table is expensive. + * + * if user's executor has less memory, set the CARBON_UPDATE_STORAGE_LEVEL + * to MEMORY_AND_DISK_SER or other storage level to correspond to different environment. + * You can get more recommendations about storage level in spark website: + * http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence. + */ + public static final String CARBON_UPDATE_STORAGE_LEVEL_DEFAULT = "MEMORY_AND_DISK"; + + /** * current data file version */ public static final String CARBON_DATA_FILE_DEFAULT_VERSION = "V3"; http://git-wip-us.apache.org/repos/asf/carbondata/blob/0ab928e9/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 4e9c21a..0ab28e2 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -890,6 +890,42 @@ public final class CarbonProperties { } /** + * Return valid CARBON_UPDATE_STORAGE_LEVEL + * @return boolean + */ + public boolean isPersistUpdateDataset() { + String isPersistEnabled = getProperty(CarbonCommonConstants.isPersistEnabled, + CarbonCommonConstants.defaultValueIsPersistEnabled); + boolean validatePersistEnabled = CarbonUtil.validateBoolean(isPersistEnabled); + if (!validatePersistEnabled) { + LOGGER.error("The " + CarbonCommonConstants.isPersistEnabled + + " configuration value is invalid. It will use default value(" + + CarbonCommonConstants.defaultValueIsPersistEnabled + + ")."); + isPersistEnabled = CarbonCommonConstants.defaultValueIsPersistEnabled; + } + return isPersistEnabled.equalsIgnoreCase("true"); + } + + /** + * Return valid storage level for CARBON_UPDATE_STORAGE_LEVEL + * @return String + */ + public String getUpdateDatasetStorageLevel() { + String storageLevel = getProperty(CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL, + CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL_DEFAULT); + boolean validateStorageLevel = CarbonUtil.isValidStorageLevel(storageLevel); + if (!validateStorageLevel) { + LOGGER.error("The " + CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL + + " configuration value is invalid. It will use default storage level(" + + CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL_DEFAULT + + ") to persist dataset."); + storageLevel = CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL_DEFAULT; + } + return storageLevel.toUpperCase(); + } + + /** * returns true if carbon property * @param key * @return http://git-wip-us.apache.org/repos/asf/carbondata/blob/0ab928e9/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala index d3a80d4..5820b9d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala @@ -175,16 +175,7 @@ private[sql] case class ProjectForUpdateCommand( val currentTime = CarbonUpdateUtil.readCurrentTime // var dataFrame: DataFrame = null var dataSet: DataFrame = null - val isPersistEnabledUserValue = CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.isPersistEnabled, - CarbonCommonConstants.defaultValueIsPersistEnabled) - var isPersistEnabled = CarbonCommonConstants.defaultValueIsPersistEnabled.toBoolean - if (isPersistEnabledUserValue.equalsIgnoreCase("false")) { - isPersistEnabled = false - } - else if (isPersistEnabledUserValue.equalsIgnoreCase("true")) { - isPersistEnabled = true - } + var isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset() try { lockStatus = metadataLock.lockWithRetries() if (lockStatus) { @@ -199,13 +190,11 @@ private[sql] case class ProjectForUpdateCommand( // Get RDD. dataSet = if (isPersistEnabled) { - Dataset.ofRows(sparkSession, plan).persist(StorageLevel.MEMORY_AND_DISK) - // DataFrame(sqlContext, plan) - // .persist(StorageLevel.MEMORY_AND_DISK) + Dataset.ofRows(sparkSession, plan).persist(StorageLevel.fromString( + CarbonProperties.getInstance.getUpdateDatasetStorageLevel())) } else { Dataset.ofRows(sparkSession, plan) - // DataFrame(sqlContext, plan) } var executionErrors = new ExecutionErrors(FailureCauses.NONE, "")