http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cb6f65/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala index bd1264a..9e4f3b7 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala @@ -43,11 +43,10 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll { " workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, " + "projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," + "utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties" + - "('dictionary_exclude'='empno','sort_columns'='empno')") + "('dictionary_exclude'='empno','sort_columns'='empno', 'SORT_SCOPE'='BATCH_SORT')") sql( s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable1a OPTIONS - |('DELIMITER'= ',', 'QUOTECHAR'= '\"','SORT_SCOPE'='BATCH_SORT', - |'batch_sort_size_inmb'='64')""".stripMargin) + |('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'batch_sort_size_inmb'='64')""".stripMargin) checkAnswer(sql("select empname from sorttable1a"), sql("select empname from origintable1 order by empname")) } @@ -61,11 +60,11 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll { " workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, " + "projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," + "utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties" + - "('dictionary_exclude'='empno,empname,workgroupcategoryname','sort_columns'='empno,empname')") + "('dictionary_exclude'='empno,empname,workgroupcategoryname','sort_columns'='empno,empname'," + + "'SORT_SCOPE'='BATCH_SORT')") sql( s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable1b OPTIONS - |('DELIMITER'= ',', 'QUOTECHAR'= '\"','SORT_SCOPE'='BATCH_SORT', - |'batch_sort_size_inmb'='64')""".stripMargin) + |('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'batch_sort_size_inmb'='64')""".stripMargin) checkAnswer(sql("select empname from sorttable1b"), sql("select empname from origintable1 order by empname")) }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cb6f65/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala index f2a4a7d..a73b0df 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala @@ -52,7 +52,7 @@ object ValidateUtil { if (sortScope != null) { // Don't support use global sort on partitioned table. if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null && - sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT.toString)) { + sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString)) { throw new MalformedCarbonCommandException("Don't support use global sort on partitioned " + "table.") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cb6f65/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 661f724..d0309ba 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -845,8 +845,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT", "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION", "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT", "BAD_RECORD_PATH", - "SINGLE_PASS", "IS_EMPTY_DATA_BAD_RECORD", "SORT_SCOPE", "BATCH_SORT_SIZE_INMB", - "GLOBAL_SORT_PARTITIONS", "HEADER" + "BATCH_SORT_SIZE_INMB", "GLOBAL_SORT_PARTITIONS", "SINGLE_PASS", + "IS_EMPTY_DATA_BAD_RECORD", "HEADER" ) var isSupported = true val invalidOptions = StringBuilder.newBuilder @@ -901,14 +901,6 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { } } - if (options.exists(_._1.equalsIgnoreCase("SORT_SCOPE"))) { - val optionValue: String = options.get("sort_scope").get.head._2 - if (!SortScopeOptions.isValidSortOption(optionValue)) { - throw new MalformedCarbonCommandException( - "option SORT_SCOPE can have option either BATCH_SORT or LOCAL_SORT or GLOBAL_SORT") - } - } - // check for duplicate options val duplicateOptions = options filter { case (_, optionlist) => optionlist.size > 1 http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cb6f65/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 98ceae8..8a39b0a 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -39,9 +39,10 @@ import org.codehaus.jackson.map.ObjectMapper import org.apache.carbondata.api.CarbonStore import org.apache.carbondata.common.constants.LoggerAction import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.DictionaryServer +import org.apache.carbondata.core.exception.InvalidConfigurationException import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.encoder.Encoding @@ -173,6 +174,14 @@ case class CreateTable(cm: TableModel) extends RunnableCommand { val tableInfo: TableInfo = TableNewProcessor(cm) + // Add validation for sort scope when create table + val sortScope = tableInfo.getFactTable.getTableProperties + .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) + if (!CarbonUtil.isValidSortOption(sortScope)) { + throw new InvalidConfigurationException(s"Passing invalid SORT_SCOPE '$sortScope'," + + s" valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT', 'LOCAL_SORT' and 'GLOBAL_SORT' ") + } + if (tableInfo.getFactTable.getListOfColumns.isEmpty) { sys.error("No Dimensions found. Table should have at least one dimesnion !") } @@ -433,10 +442,25 @@ case class LoadTable( val dateFormat = options.getOrElse("dateformat", null) ValidateUtil.validateDateFormat(dateFormat, table, tableName) val maxColumns = options.getOrElse("maxcolumns", null) - val sortScope = options.getOrElse("sort_scope", null) + val tableProperties = table.getTableInfo.getFactTable.getTableProperties + val sortScopeDefault = CarbonProperties.getInstance(). + getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, + CarbonProperties.getInstance().getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, + CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) + val sortScope = if (null == tableProperties) { + sortScopeDefault + } else { + tableProperties.getOrDefault("sort_scope", sortScopeDefault) + } + ValidateUtil.validateSortScope(table, sortScope) - val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null) - val globalSortPartitions = options.getOrElse("global_sort_partitions", null) + val carbonProperty: CarbonProperties = CarbonProperties.getInstance() + val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB, + carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, + CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))) + val globalSortPartitions = options.getOrElse("global_sort_partitions", carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null)) ValidateUtil.validateGlobalSortPartitions(globalSortPartitions) // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option, @@ -884,6 +908,9 @@ private[sql] case class DescribeCommandFormatted( results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, "")) val carbonTable = relation.tableMeta.carbonTable results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", "")) + results ++= Seq(("SORT_SCOPE", carbonTable.getTableInfo.getFactTable + .getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants + .LOAD_SORT_SCOPE_DEFAULT), CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) results ++= Seq(("", "", ""), ("##Detailed Column property", "", "")) if (colPropStr.length() > 0) { results ++= Seq((colPropStr, "", "")) http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cb6f65/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index de16f69..3f0153e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -43,6 +43,7 @@ import org.apache.carbondata.core.cache.CacheProvider import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.DictionaryServer +import org.apache.carbondata.core.exception.InvalidConfigurationException import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl @@ -345,6 +346,14 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru val tableInfo: TableInfo = TableNewProcessor(cm) + // Add validation for sort scope when create table + val sortScope = tableInfo.getFactTable.getTableProperties + .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) + if (!CarbonUtil.isValidSortOption(sortScope)) { + throw new InvalidConfigurationException(s"Passing invalid SORT_SCOPE '$sortScope'," + + s" valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT', 'LOCAL_SORT' and 'GLOBAL_SORT' ") + } + if (tableInfo.getFactTable.getListOfColumns.size <= 0) { sys.error("No Dimensions found. Table should have at least one dimesnion !") } @@ -562,16 +571,12 @@ case class LoadTable( .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null))) optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null)) - optionsFinal.put("sort_scope", options - .getOrElse("sort_scope", - carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, - carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, - CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)))) optionsFinal.put("batch_sort_size_inmb", options.getOrElse("batch_sort_size_inmb", carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB, carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)))) + optionsFinal.put("bad_record_path", options.getOrElse("bad_record_path", carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, carbonProperty.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, @@ -640,6 +645,15 @@ case class LoadTable( val carbonProperty: CarbonProperties = CarbonProperties.getInstance() carbonProperty.addProperty("zookeeper.enable.lock", "false") val optionsFinal = getFinalOptions(carbonProperty) + + val tableProperties = relation.tableMeta.carbonTable.getTableInfo + .getFactTable.getTableProperties + + optionsFinal.put("sort_scope", tableProperties.getOrDefault("sort_scope", + carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, + carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, + CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)))) + try { val factPath = if (dataFrame.isDefined) { "" @@ -677,7 +691,6 @@ case class LoadTable( ValidateUtil.validateDateFormat(dateFormat, table, tableName) ValidateUtil.validateSortScope(table, sort_scope) - if (bad_records_logger_enable.toBoolean || LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) { if (!CarbonUtil.isValidBadStorePath(bad_record_path)) { @@ -1140,6 +1153,9 @@ private[sql] case class DescribeCommandFormatted( results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, "")) val carbonTable = relation.tableMeta.carbonTable results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", "")) + results ++= Seq(("SORT_SCOPE", carbonTable.getTableInfo.getFactTable + .getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants + .LOAD_SORT_SCOPE_DEFAULT), CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) results ++= Seq(("", "", ""), ("##Detailed Column property", "", "")) if (colPropStr.length() > 0) { results ++= Seq((colPropStr, "", ""))