Repository: carbondata Updated Branches: refs/heads/encoding_override b699ee6f7 -> 403c3d9b4
single_pass blocked for global_sort Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a4083bf1 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a4083bf1 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a4083bf1 Branch: refs/heads/encoding_override Commit: a4083bf1ada27d257ce2f666018ef361c7f60c1d Parents: b699ee6 Author: rahulforallp <rahul.ku...@knoldus.in> Authored: Fri Jun 30 00:06:56 2017 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Wed Jul 5 09:00:16 2017 +0530 ---------------------------------------------------------------------- .../execution/command/carbonTableSchema.scala | 1 + .../execution/command/carbonTableSchema.scala | 221 +++++++++++-------- .../store/CarbonFactDataHandlerColumnar.java | 8 + 3 files changed, 137 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4083bf1/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index fa5a0d6..ee77f35 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -450,6 +450,7 @@ class TableNewProcessor(cm: TableModel) { // Setting the boolean value of useInvertedIndex in column schema val noInvertedIndexCols = cm.noInvertedIdxCols.getOrElse(Seq()) + LOGGER.info("NoINVERTEDINDEX columns are : " + noInvertedIndexCols.mkString(",")) for (column <- allColumns) { // When the column is measure or the specified no inverted index column in DDL, // set useInvertedIndex to false, otherwise true. http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4083bf1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index bb5bdd1..8e7db45 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -356,6 +356,84 @@ case class LoadTable( val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + private def getFinalOptions(carbonProperty: CarbonProperties): scala.collection + .mutable.Map[String, String] = { + var optionsFinal = scala.collection.mutable.Map[String, String]() + optionsFinal.put("delimiter", options.getOrElse("delimiter", ",")) + optionsFinal.put("quotechar", options.getOrElse("quotechar", "\"")) + optionsFinal.put("fileheader", options.getOrElse("fileheader", "")) + optionsFinal.put("escapechar", options.getOrElse("escapechar", "\\")) + optionsFinal.put("commentchar", options.getOrElse("commentchar", "#")) + optionsFinal.put("columndict", options.getOrElse("columndict", null)) + optionsFinal + .put("serialization_null_format", options.getOrElse("serialization_null_format", "\\N")) + optionsFinal.put("bad_records_logger_enable", options.getOrElse("bad_records_logger_enable", + carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE, + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT))) + val badRecordActionValue = carbonProperty + .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) + optionsFinal.put("bad_records_action", options.getOrElse("bad_records_action", carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, + badRecordActionValue))) + optionsFinal + .put("is_empty_data_bad_record", options.getOrElse("is_empty_data_bad_record", carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT))) + optionsFinal.put("all_dictionary_path", options.getOrElse("all_dictionary_path", "")) + optionsFinal + .put("complex_delimiter_level_1", options.getOrElse("complex_delimiter_level_1", "\\$")) + optionsFinal + .put("complex_delimiter_level_2", options.getOrElse("complex_delimiter_level_2", "\\:")) + optionsFinal.put("dateformat", options.getOrElse("dateformat", + carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, + CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT))) + + optionsFinal.put("global_sort_partitions", options.getOrElse("global_sort_partitions", + carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null))) + + optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null)) + optionsFinal.put("sort_scope", options + .getOrElse("sort_scope", + carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, + carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, + CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)))) + + optionsFinal.put("batch_sort_size_inmb", options.getOrElse("batch_sort_size_inmb", + carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB, + carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, + CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)))) + optionsFinal.put("bad_record_path", options.getOrElse("bad_record_path", + carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, + carbonProperty.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)))) + + val useOnePass = options.getOrElse("single_pass", + carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS, + CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match { + case "true" => + true + case "false" => + // when single_pass = false and if either alldictionarypath + // or columnDict is configured the do not allow load + if (StringUtils.isNotEmpty(optionsFinal.get("all_dictionary_path").get) || + StringUtils.isNotEmpty(optionsFinal.get("columndict").get)) { + throw new MalformedCarbonCommandException( + "Can not use all_dictionary_path or columndict without single_pass.") + } else { + false + } + case illegal => + LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " + + "Please set it as 'true' or 'false'") + false + } + optionsFinal.put("single_pass", useOnePass.toString) + optionsFinal + } + private def checkDefaultValue(value: String, default: String) = { if (StringUtils.isEmpty(value)) { default @@ -390,6 +468,7 @@ case class LoadTable( val carbonProperty: CarbonProperties = CarbonProperties.getInstance() carbonProperty.addProperty("zookeeper.enable.lock", "false") + val optionsFinal = getFinalOptions(carbonProperty) val carbonLock = CarbonLockFactory .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier .getCarbonTableIdentifier, @@ -426,66 +505,39 @@ case class LoadTable( val partitionLocation = relation.tableMeta.storePath + "/partition/" + relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" + relation.tableMeta.carbonTableIdentifier.getTableName + "/" - - val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean - - val delimiter = options.getOrElse("delimiter", ",") - val quoteChar = options.getOrElse("quotechar", "\"") - val fileHeader = options.getOrElse("fileheader", "") - val escapeChar = options.getOrElse("escapechar", "\\") - val commentChar = options.getOrElse("commentchar", "#") - val columnDict = options.getOrElse("columndict", null) - val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N") - val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", - carbonProperty - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE, - CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)) - val badRecordActionValue = carbonProperty - .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) - val badRecordsAction = options.getOrElse("bad_records_action", carbonProperty - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, - badRecordActionValue)) - val isEmptyDataBadRecord = options.getOrElse("is_empty_data_bad_record", carbonProperty - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, - CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)) - val allDictionaryPath = options.getOrElse("all_dictionary_path", "") - val complex_delimiter_level_1 = options.getOrElse("complex_delimiter_level_1", "\\$") - val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:") - val dateFormat = options.getOrElse("dateformat", - carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, - CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)) + val sort_scope = optionsFinal.get("sort_scope").get + val single_pass = optionsFinal.get("single_pass").get + val bad_records_logger_enable = optionsFinal.get("bad_records_logger_enable").get + val bad_records_action = optionsFinal.get("bad_records_action").get + val bad_record_path = optionsFinal.get("bad_record_path").get + val global_sort_partitions = optionsFinal.get("global_sort_partitions").get + val dateFormat = optionsFinal.get("dateformat").get + val delimeter = optionsFinal.get("delimiter").get + val complex_delimeter_level1 = optionsFinal.get("complex_delimiter_level_1").get + val complex_delimeter_level2 = optionsFinal.get("complex_delimiter_level_2").get + val all_dictionary_path = optionsFinal.get("all_dictionary_path").get + val column_dict = optionsFinal.get("columndict").get + if (sort_scope.equals("GLOBAL_SORT") && + single_pass.equals("TRUE")) { + sys.error("Global_Sort can't be used with single_pass flow") + } ValidateUtil.validateDateFormat(dateFormat, table, tableName) - val maxColumns = options.getOrElse("maxcolumns", null) - val sortScope = options - .getOrElse("sort_scope", - carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, - carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, - CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))) - ValidateUtil.validateSortScope(table, sortScope) - val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", - carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB, - carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, - CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))) - val bad_record_path = options.getOrElse("bad_record_path", - carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, - carbonProperty.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))) - if (badRecordsLoggerEnable.toBoolean || - LoggerAction.REDIRECT.name().equalsIgnoreCase(badRecordsAction)) { + ValidateUtil.validateSortScope(table, sort_scope) + + + if (bad_records_logger_enable.toBoolean || + LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) { if (!CarbonUtil.isValidBadStorePath(bad_record_path)) { sys.error("Invalid bad records location.") } } carbonLoadModel.setBadRecordsLocation(bad_record_path) - val globalSortPartitions = options.getOrElse("global_sort_partitions", - carbonProperty - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null)) - ValidateUtil.validateGlobalSortPartitions(globalSortPartitions) - carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\")) - carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\"")) - carbonLoadModel.setCommentChar(checkDefaultValue(commentChar, "#")) + + ValidateUtil.validateGlobalSortPartitions(global_sort_partitions) + carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal.get("escapechar").get, "\\")) + carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal.get("quotechar").get, "\"")) + carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal.get("commentchar").get, "#")) carbonLoadModel.setDateFormat(dateFormat) carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty( CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, @@ -495,65 +547,48 @@ case class LoadTable( CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) carbonLoadModel .setSerializationNullFormat( - TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + serializationNullFormat) + TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + + optionsFinal.get("serialization_null_format").get) carbonLoadModel .setBadRecordsLoggerEnable( - TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + badRecordsLoggerEnable) + TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable) carbonLoadModel .setBadRecordsAction( - TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsAction) + TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action) carbonLoadModel .setIsEmptyDataBadRecord( - DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord) - carbonLoadModel.setSortScope(sortScope) - carbonLoadModel.setBatchSortSizeInMb(batchSortSizeInMB) - carbonLoadModel.setGlobalSortPartitions(globalSortPartitions) - val useOnePass = options.getOrElse("single_pass", - carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS, - CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match { - case "true" => - true - case "false" => - // when single_pass = false and if either alldictionarypath - // or columnDict is configured the do not allow load - if (StringUtils.isNotEmpty(allDictionaryPath) || StringUtils.isNotEmpty(columnDict)) { - throw new MalformedCarbonCommandException( - "Can not use all_dictionary_path or columndict without single_pass.") - } else { - false - } - case illegal => - LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " + - "Please set it as 'true' or 'false'") - false - } - carbonLoadModel.setUseOnePass(useOnePass) - if (delimiter.equalsIgnoreCase(complex_delimiter_level_1) || - complex_delimiter_level_1.equalsIgnoreCase(complex_delimiter_level_2) || - delimiter.equalsIgnoreCase(complex_delimiter_level_2)) { + DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + + optionsFinal.get("is_empty_data_bad_record").get) + carbonLoadModel.setSortScope(sort_scope) + carbonLoadModel.setBatchSortSizeInMb(optionsFinal.get("batch_sort_size_inmb").get) + carbonLoadModel.setGlobalSortPartitions(global_sort_partitions) + carbonLoadModel.setUseOnePass(single_pass.toBoolean) + if (delimeter.equalsIgnoreCase(complex_delimeter_level1) || + complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) || + delimeter.equalsIgnoreCase(complex_delimeter_level2)) { sys.error(s"Field Delimiter & Complex types delimiter are same") } else { carbonLoadModel.setComplexDelimiterLevel1( - CarbonUtil.delimiterConverter(complex_delimiter_level_1)) + CarbonUtil.delimiterConverter(complex_delimeter_level1)) carbonLoadModel.setComplexDelimiterLevel2( - CarbonUtil.delimiterConverter(complex_delimiter_level_2)) + CarbonUtil.delimiterConverter(complex_delimeter_level2)) } // set local dictionary path, and dictionary file extension - carbonLoadModel.setAllDictPath(allDictionaryPath) + carbonLoadModel.setAllDictPath(all_dictionary_path) val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS try { // First system has to partition the data first and then call the load data LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") carbonLoadModel.setFactFilePath(factPath) - carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimiter)) - carbonLoadModel.setCsvHeader(fileHeader) - carbonLoadModel.setColDictFilePath(columnDict) + carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter)) + carbonLoadModel.setCsvHeader(optionsFinal.get("fileheader").get) + carbonLoadModel.setColDictFilePath(column_dict) carbonLoadModel.setDirectLoad(true) carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel)) val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns, - maxColumns) + optionsFinal.get("maxcolumns").get) carbonLoadModel.setMaxColumns(validatedMaxColumns.toString) GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata val storePath = relation.tableMeta.storePath @@ -561,7 +596,7 @@ case class LoadTable( CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath) } if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass && - StringUtils.isEmpty(columnDict) && StringUtils.isEmpty(allDictionaryPath)) { + StringUtils.isEmpty(column_dict) && StringUtils.isEmpty(all_dictionary_path)) { LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load") LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load") carbonLoadModel.setUseOnePass(false) @@ -583,7 +618,7 @@ case class LoadTable( .generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier, dimensions, carbonLoadModel, sparkSession.sqlContext, storePath, dictFolderPath) } - if (!StringUtils.isEmpty(allDictionaryPath)) { + if (!StringUtils.isEmpty(all_dictionary_path)) { carbonLoadModel.initPredefDictMap() GlobalDictionaryUtil .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext, @@ -592,7 +627,7 @@ case class LoadTable( carbonTableIdentifier, dictFolderPath, dimensions, - allDictionaryPath) + all_dictionary_path) } // dictionaryServerClient dictionary generator val dictionaryServerPort = carbonProperty http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4083bf1/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index 01e3ab6..429c5a3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -48,6 +48,7 @@ import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.NodeHolder; @@ -201,6 +202,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { } this.version = CarbonProperties.getInstance().getFormatVersion(); this.encoder = new TablePageEncoder(model); + String noInvertedIdxCol = ""; + for (CarbonDimension cd : model.getSegmentProperties().getDimensions()) { + if (!cd.isUseInvertedIndex()) { + noInvertedIdxCol += (cd.getColName() + ","); + } + } + LOGGER.info("Columns considered as NoInverted Index are " + noInvertedIdxCol); } private void initParameters(CarbonFactDataHandlerModel model) {