http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala index 8d394db..e69de29 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala @@ -1,610 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.util - -import java.text.SimpleDateFormat -import java.util -import java.util.{Date, List, Locale} - -import scala.collection.{immutable, mutable} -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer - -import org.apache.commons.lang3.StringUtils -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType} -import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} -import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} -import org.apache.spark.sql.util.CarbonException -import org.apache.spark.sql.util.SparkSQLUtil.sessionState - -import org.apache.carbondata.common.constants.LoggerAction -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} -import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} -import org.apache.carbondata.core.indexstore.PartitionSpec -import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} -import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants -import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat -import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} -import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders, TableOptionConstant} -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException -import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark.LOGGER -import org.apache.carbondata.spark.load.ValidateUtil -import org.apache.carbondata.spark.rdd.SerializableConfiguration - -/** - * the util object of data loading - */ -object DataLoadingUtil { - - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - /** - * get data loading options and initialise default value - */ - def getDataLoadingOptions( - carbonProperty: CarbonProperties, - options: immutable.Map[String, String]): mutable.Map[String, String] = { - val optionsFinal = scala.collection.mutable.Map[String, String]() - optionsFinal.put("delimiter", options.getOrElse("delimiter", ",")) - optionsFinal.put("quotechar", options.getOrElse("quotechar", "\"")) - optionsFinal.put("fileheader", options.getOrElse("fileheader", "")) - optionsFinal.put("commentchar", options.getOrElse("commentchar", "#")) - optionsFinal.put("columndict", options.getOrElse("columndict", null)) - - optionsFinal.put("escapechar", - CarbonLoaderUtil.getEscapeChar(options.getOrElse("escapechar", "\\"))) - - optionsFinal.put( - "serialization_null_format", - options.getOrElse("serialization_null_format", "\\N")) - - optionsFinal.put( - "bad_records_logger_enable", - options.getOrElse( - "bad_records_logger_enable", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE, - CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT))) - - val badRecordActionValue = carbonProperty.getProperty( - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) - - optionsFinal.put( - "bad_records_action", - options.getOrElse( - "bad_records_action", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, - badRecordActionValue))) - - optionsFinal.put( - "is_empty_data_bad_record", - options.getOrElse( - "is_empty_data_bad_record", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, - CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT))) - - optionsFinal.put( - "skip_empty_line", - options.getOrElse( - "skip_empty_line", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE))) - - optionsFinal.put("all_dictionary_path", options.getOrElse("all_dictionary_path", "")) - - optionsFinal.put( - "complex_delimiter_level_1", - options.getOrElse("complex_delimiter_level_1", "$")) - - optionsFinal.put( - "complex_delimiter_level_2", - options.getOrElse("complex_delimiter_level_2", ":")) - - optionsFinal.put( - "dateformat", - options.getOrElse( - "dateformat", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, - CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT))) - - optionsFinal.put( - "timestampformat", - options.getOrElse( - "timestampformat", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, - CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT))) - - optionsFinal.put( - "global_sort_partitions", - options.getOrElse( - "global_sort_partitions", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, - null))) - - optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null)) - - optionsFinal.put( - "batch_sort_size_inmb", - options.getOrElse( - "batch_sort_size_inmb", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB, - carbonProperty.getProperty( - CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, - CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)))) - - optionsFinal.put( - "bad_record_path", - options.getOrElse( - "bad_record_path", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, - carbonProperty.getProperty( - CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)))) - - val useOnePass = options.getOrElse( - "single_pass", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS, - CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match { - case "true" => - true - case "false" => - // when single_pass = false and if either alldictionarypath - // or columnDict is configured the do not allow load - if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) || - StringUtils.isNotEmpty(optionsFinal("columndict"))) { - throw new MalformedCarbonCommandException( - "Can not use all_dictionary_path or columndict without single_pass.") - } else { - false - } - case illegal => - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - LOGGER.error(s"Can't use single_pass, because illegal syntax found: [$illegal] " + - "Please set it as 'true' or 'false'") - false - } - optionsFinal.put("single_pass", useOnePass.toString) - optionsFinal - } - - /** - * check whether using default value or not - */ - private def checkDefaultValue(value: String, default: String) = { - if (StringUtils.isEmpty(value)) { - default - } else { - value - } - } - - /** - * build CarbonLoadModel for data loading - * @param table CarbonTable object containing all metadata information for the table - * like table name, table path, schema, etc - * @param options Load options from user input - * @return a new CarbonLoadModel instance - */ - def buildCarbonLoadModelJava( - table: CarbonTable, - options: java.util.Map[String, String] - ): CarbonLoadModel = { - val carbonProperty: CarbonProperties = CarbonProperties.getInstance - val optionsFinal = getDataLoadingOptions(carbonProperty, options.asScala.toMap) - optionsFinal.put("sort_scope", "no_sort") - if (!options.containsKey("fileheader")) { - val csvHeader = table.getCreateOrderColumn(table.getTableName) - .asScala.map(_.getColName).mkString(",") - optionsFinal.put("fileheader", csvHeader) - } - val model = new CarbonLoadModel() - buildCarbonLoadModel( - table = table, - carbonProperty = carbonProperty, - options = options.asScala.toMap, - optionsFinal = optionsFinal, - carbonLoadModel = model, - hadoopConf = null) // we have provided 'fileheader', so it can be null - - // set default values - model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) - model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) - model.setUseOnePass(options.asScala.getOrElse("onepass", "false").toBoolean) - model.setDictionaryServerHost(options.asScala.getOrElse("dicthost", null)) - model.setDictionaryServerPort(options.asScala.getOrElse("dictport", "-1").toInt) - model - } - - /** - * build CarbonLoadModel for data loading - * @param table CarbonTable object containing all metadata information for the table - * like table name, table path, schema, etc - * @param carbonProperty Carbon property instance - * @param options Load options from user input - * @param optionsFinal Load options that populated with default values for optional options - * @param carbonLoadModel The output load model - * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in - * user provided load options - */ - def buildCarbonLoadModel( - table: CarbonTable, - carbonProperty: CarbonProperties, - options: immutable.Map[String, String], - optionsFinal: mutable.Map[String, String], - carbonLoadModel: CarbonLoadModel, - hadoopConf: Configuration, - partition: Map[String, Option[String]] = Map.empty, - isDataFrame: Boolean = false): Unit = { - carbonLoadModel.setTableName(table.getTableName) - carbonLoadModel.setDatabaseName(table.getDatabaseName) - carbonLoadModel.setTablePath(table.getTablePath) - carbonLoadModel.setTableName(table.getTableName) - val dataLoadSchema = new CarbonDataLoadSchema(table) - // Need to fill dimension relation - carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) - val sort_scope = optionsFinal("sort_scope") - val single_pass = optionsFinal("single_pass") - val bad_records_logger_enable = optionsFinal("bad_records_logger_enable") - val bad_records_action = optionsFinal("bad_records_action") - var bad_record_path = optionsFinal("bad_record_path") - val global_sort_partitions = optionsFinal("global_sort_partitions") - val timestampformat = optionsFinal("timestampformat") - val dateFormat = optionsFinal("dateformat") - val delimeter = optionsFinal("delimiter") - val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1") - val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2") - val all_dictionary_path = optionsFinal("all_dictionary_path") - val column_dict = optionsFinal("columndict") - ValidateUtil.validateDateTimeFormat(timestampformat, "TimestampFormat") - ValidateUtil.validateDateTimeFormat(dateFormat, "DateFormat") - ValidateUtil.validateSortScope(table, sort_scope) - - if (bad_records_logger_enable.toBoolean || - LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) { - if (!CarbonUtil.isValidBadStorePath(bad_record_path)) { - CarbonException.analysisException("Invalid bad records location.") - } - bad_record_path = CarbonUtil.checkAndAppendHDFSUrl(bad_record_path) - } - carbonLoadModel.setBadRecordsLocation(bad_record_path) - - ValidateUtil.validateGlobalSortPartitions(global_sort_partitions) - carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\")) - carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\"")) - carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#")) - - // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option, - // we should use table schema to generate file header. - var fileHeader = optionsFinal("fileheader") - val headerOption = options.get("header") - if (headerOption.isDefined) { - // whether the csv file has file header - // the default value is true - val header = try { - headerOption.get.toBoolean - } catch { - case ex: IllegalArgumentException => - throw new MalformedCarbonCommandException( - "'header' option should be either 'true' or 'false'. " + ex.getMessage) - } - if (header) { - if (fileHeader.nonEmpty) { - throw new MalformedCarbonCommandException( - "When 'header' option is true, 'fileheader' option is not required.") - } - } else { - if (fileHeader.isEmpty) { - fileHeader = table.getCreateOrderColumn(table.getTableName) - .asScala.map(_.getColName).mkString(",") - } - } - } - - carbonLoadModel.setTimestampformat(timestampformat) - carbonLoadModel.setDateFormat(dateFormat) - carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty( - CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, - CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) - - carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty( - CarbonCommonConstants.CARBON_DATE_FORMAT, - CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) - - carbonLoadModel.setSerializationNullFormat( - TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + - optionsFinal("serialization_null_format")) - - carbonLoadModel.setBadRecordsLoggerEnable( - TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable) - - carbonLoadModel.setBadRecordsAction( - TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action.toUpperCase) - - carbonLoadModel.setIsEmptyDataBadRecord( - DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + - optionsFinal("is_empty_data_bad_record")) - - carbonLoadModel.setSkipEmptyLine(optionsFinal("skip_empty_line")) - - carbonLoadModel.setSortScope(sort_scope) - carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb")) - carbonLoadModel.setGlobalSortPartitions(global_sort_partitions) - carbonLoadModel.setUseOnePass(single_pass.toBoolean) - - if (delimeter.equalsIgnoreCase(complex_delimeter_level1) || - complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) || - delimeter.equalsIgnoreCase(complex_delimeter_level2)) { - CarbonException.analysisException(s"Field Delimiter and Complex types delimiter are same") - } else { - carbonLoadModel.setComplexDelimiterLevel1(complex_delimeter_level1) - carbonLoadModel.setComplexDelimiterLevel2(complex_delimeter_level2) - } - // set local dictionary path, and dictionary file extension - carbonLoadModel.setAllDictPath(all_dictionary_path) - carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter)) - carbonLoadModel.setCsvHeader(fileHeader) - carbonLoadModel.setColDictFilePath(column_dict) - - val ignoreColumns = new util.ArrayList[String]() - if (!isDataFrame) { - ignoreColumns.addAll(partition.filter(_._2.isDefined).keys.toList.asJava) - } - carbonLoadModel.setCsvHeaderColumns( - CommonUtil.getCsvHeaderColumns(carbonLoadModel, hadoopConf, ignoreColumns)) - - val validatedMaxColumns = CommonUtil.validateMaxColumns( - carbonLoadModel.getCsvHeaderColumns, - optionsFinal("maxcolumns")) - - carbonLoadModel.setMaxColumns(validatedMaxColumns.toString) - if (null == carbonLoadModel.getLoadMetadataDetails) { - CommonUtil.readLoadMetadataDetails(carbonLoadModel) - } - } - - private def isLoadDeletionRequired(metaDataLocation: String): Boolean = { - val details = SegmentStatusManager.readLoadMetadata(metaDataLocation) - if (details != null && details.nonEmpty) for (oneRow <- details) { - if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus || - SegmentStatus.COMPACTED == oneRow.getSegmentStatus || - SegmentStatus.INSERT_IN_PROGRESS == oneRow.getSegmentStatus || - SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneRow.getSegmentStatus) && - oneRow.getVisibility.equalsIgnoreCase("true")) { - return true - } - } - false - } - - def deleteLoadsAndUpdateMetadata( - isForceDeletion: Boolean, - carbonTable: CarbonTable, - specs: util.List[PartitionSpec]): Unit = { - if (isLoadDeletionRequired(carbonTable.getMetadataPath)) { - val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier - - val (details, updationRequired) = - isUpdationRequired( - isForceDeletion, - carbonTable, - absoluteTableIdentifier) - - - if (updationRequired) { - val carbonTableStatusLock = - CarbonLockFactory.getCarbonLockObj( - absoluteTableIdentifier, - LockUsage.TABLE_STATUS_LOCK - ) - var locked = false - var updationCompletionStaus = false - try { - // Update load metadate file after cleaning deleted nodes - locked = carbonTableStatusLock.lockWithRetries() - if (locked) { - LOGGER.info("Table status lock has been successfully acquired.") - // Again read status and check to verify updation required or not. - val (details, updationRequired) = - isUpdationRequired( - isForceDeletion, - carbonTable, - absoluteTableIdentifier) - if (!updationRequired) { - return - } - // read latest table status again. - val latestMetadata = SegmentStatusManager - .readLoadMetadata(carbonTable.getMetadataPath) - - // update the metadata details from old to new status. - val latestStatus = CarbonLoaderUtil - .updateLoadMetadataFromOldToNew(details, latestMetadata) - - CarbonLoaderUtil.writeLoadMetadata(absoluteTableIdentifier, latestStatus) - } else { - val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName - val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName - val errorMsg = "Clean files request is failed for " + - s"$dbName.$tableName" + - ". Not able to acquire the table status lock due to other operation " + - "running in the background." - LOGGER.audit(errorMsg) - LOGGER.error(errorMsg) - throw new Exception(errorMsg + " Please try after some time.") - } - updationCompletionStaus = true - } finally { - if (locked) { - CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK) - } - } - if (updationCompletionStaus) { - DeleteLoadFolders - .physicalFactAndMeasureMetadataDeletion(absoluteTableIdentifier, - carbonTable.getMetadataPath, isForceDeletion, specs) - } - } - } - } - - private def isUpdationRequired(isForceDeletion: Boolean, - carbonTable: CarbonTable, - absoluteTableIdentifier: AbsoluteTableIdentifier): (Array[LoadMetadataDetails], Boolean) = { - val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) - // Delete marked loads - val isUpdationRequired = - DeleteLoadFolders.deleteLoadFoldersFromFileSystem( - absoluteTableIdentifier, - isForceDeletion, - details, - carbonTable.getMetadataPath - ) - (details, isUpdationRequired) - } - - /** - * creates a RDD that does reading of multiple CSV files - */ - def csvFileScanRDD( - spark: SparkSession, - model: CarbonLoadModel, - hadoopConf: Configuration - ): RDD[InternalRow] = { - // 1. partition - val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes - val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes - val defaultParallelism = spark.sparkContext.defaultParallelism - CommonUtil.configureCSVInputFormat(hadoopConf, model) - hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath) - val jobConf = new JobConf(hadoopConf) - SparkHadoopUtil.get.addCredentials(jobConf) - val jobContext = new JobContextImpl(jobConf, null) - val inputFormat = new CSVInputFormat() - val rawSplits = inputFormat.getSplits(jobContext).toArray - val splitFiles = rawSplits.map { split => - val fileSplit = split.asInstanceOf[FileSplit] - PartitionedFile( - InternalRow.empty, - fileSplit.getPath.toString, - fileSplit.getStart, - fileSplit.getLength, - fileSplit.getLocations) - }.sortBy(_.length)(implicitly[Ordering[Long]].reverse) - val totalBytes = splitFiles.map(_.length + openCostInBytes).sum - val bytesPerCore = totalBytes / defaultParallelism - - val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) - LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + - s"open cost is considered as scanning $openCostInBytes bytes.") - - val partitions = new ArrayBuffer[FilePartition] - val currentFiles = new ArrayBuffer[PartitionedFile] - var currentSize = 0L - - def closePartition(): Unit = { - if (currentFiles.nonEmpty) { - val newPartition = - FilePartition( - partitions.size, - currentFiles.toArray.toSeq) - partitions += newPartition - } - currentFiles.clear() - currentSize = 0 - } - - splitFiles.foreach { file => - if (currentSize + file.length > maxSplitBytes) { - closePartition() - } - // Add the given file to the current partition. - currentSize += file.length + openCostInBytes - currentFiles += file - } - closePartition() - - // 2. read function - val serializableConfiguration = new SerializableConfiguration(jobConf) - val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable { - override def apply(file: PartitionedFile): Iterator[InternalRow] = { - new Iterator[InternalRow] { - val hadoopConf = serializableConfiguration.value - val jobTrackerId: String = { - val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) - formatter.format(new Date()) - } - val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0) - val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId) - val inputSplit = - new FileSplit(new Path(file.filePath), file.start, file.length, file.locations) - var finished = false - val inputFormat = new CSVInputFormat() - val reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext) - reader.initialize(inputSplit, hadoopAttemptContext) - - override def hasNext: Boolean = { - if (!finished) { - if (reader != null) { - if (reader.nextKeyValue()) { - true - } else { - finished = true - reader.close() - false - } - } else { - finished = true - false - } - } else { - false - } - } - - override def next(): InternalRow = { - new GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]]) - } - } - } - } - new FileScanRDD(spark, readFunction, partitions) - } - -}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala index 2bd4f45..fb9ecac 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala @@ -321,7 +321,7 @@ object GlobalDictionaryUtil { carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) // get load count if (null == carbonLoadModel.getLoadMetadataDetails) { - CommonUtil.readLoadMetadataDetails(carbonLoadModel) + carbonLoadModel.readAndSetLoadMetadataDetails() } val absoluteTableIdentifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath, table) DictionaryLoadModel( http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 84215fd..b1e4083 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.PartitionUtils import org.apache.carbondata.common.constants.LoggerAction +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.datatype.DataTypes @@ -42,7 +43,6 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.processing.util.CarbonLoaderUtil -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil} /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 349c436..2c4d604 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -153,7 +153,7 @@ object CarbonDataRDDFactory { if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA) { // update the updated table status. For the case of Update Delta Compaction the Metadata // is filled in LoadModel, no need to refresh. - CommonUtil.readLoadMetadataDetails(carbonLoadModel) + carbonLoadModel.readAndSetLoadMetadataDetails() } val compactionThread = new Thread { @@ -274,7 +274,7 @@ object CarbonDataRDDFactory { loadModel.setTableName(table.getCarbonTableIdentifier.getTableName) loadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName) loadModel.setTablePath(table.getTablePath) - CommonUtil.readLoadMetadataDetails(loadModel) + loadModel.readAndSetLoadMetadataDetails() val loadStartTime = CarbonUpdateUtil.readCurrentTime() loadModel.setFactTimeStamp(loadStartTime) loadModel http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index 1210b92..231b748 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -76,7 +76,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, } // scan again and determine if anything is there to merge again. - CommonUtil.readLoadMetadataDetails(carbonLoadModel) + carbonLoadModel.readAndSetLoadMetadataDetails() segList = carbonLoadModel.getLoadMetadataDetails // in case of major compaction we will scan only once and come out as it will keep // on doing major for the new loads also. http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 7d70534..adf5e04 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -35,13 +35,13 @@ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CarbonException +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.spark.CarbonOption -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CarbonScalaUtil import org.apache.carbondata.streaming.{CarbonStreamException, StreamSinkFactory} http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index 0eb7ad5..6508777 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -23,10 +23,10 @@ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.preaaggregate.CreatePreAggregateTableCommand import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil +import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException} import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._ -import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, MalformedDataMapCommandException} /** * Below command class will be used to create datamap on table http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index 2675036..dcc71a2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand +import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, NoSuchDataMapException} import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} @@ -34,7 +35,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.events._ -import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, NoSuchDataMapException} /** * Drops the datamap and any related tables associated with the datamap http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index acc3358..e47c500 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -21,18 +21,20 @@ import java.io.{File, IOException} import scala.collection.JavaConverters._ -import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext} +import org.apache.spark.sql.{CarbonEnv, Row, SQLContext, SparkSession} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel, DataCommand} +import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel} import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog} import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.AlterTableUtil +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.mutate.CarbonUpdateUtil @@ -44,7 +46,6 @@ import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} -import org.apache.carbondata.spark.exception.{ConcurrentOperationException, MalformedCarbonCommandException} import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory import org.apache.carbondata.spark.util.CommonUtil import org.apache.carbondata.streaming.StreamHandoffRDD @@ -103,7 +104,6 @@ case class CarbonAlterTableCompactionCommand( .fireEvent(alterTableCompactionExceptionEvent, operationContext) compactionException = operationContext.getProperty("compactionException").toString } - if (compactionException.equalsIgnoreCase("true") && null == compactionType) { throw new MalformedCarbonCommandException( "Unsupported alter operation on carbon table") @@ -159,8 +159,7 @@ case class CarbonAlterTableCompactionCommand( operationContext: OperationContext): Unit = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase) - val compactionSize: Long = CarbonDataMergerUtil - .getCompactionSize(compactionType, carbonLoadModel) + val compactionSize = CarbonDataMergerUtil.getCompactionSize(compactionType, carbonLoadModel) if (CompactionType.IUD_UPDDEL_DELTA == compactionType) { if (alterTableModel.segmentUpdateStatusManager.isDefined) { carbonLoadModel.setSegmentUpdateStatusManager( @@ -175,7 +174,7 @@ case class CarbonAlterTableCompactionCommand( val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable if (null == carbonLoadModel.getLoadMetadataDetails) { - CommonUtil.readLoadMetadataDetails(carbonLoadModel) + carbonLoadModel.readAndSetLoadMetadataDetails() } if (compactionType == CompactionType.STREAMING) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index a42031d..70134a6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -46,6 +46,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils} +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.datamap.DataMapStoreManager @@ -70,12 +71,15 @@ import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} import org.apache.carbondata.processing.loading.exception.NoRetryException +import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} +import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.loading.sort.SortScopeOptions import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionRDD} +import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil} import org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl} @@ -174,7 +178,7 @@ case class CarbonLoadDataCommand( val carbonLoadModel = new CarbonLoadModel() try { val tableProperties = table.getTableInfo.getFactTable.getTableProperties - val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options) + val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava) optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope", carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, @@ -189,10 +193,8 @@ case class CarbonLoadDataCommand( carbonLoadModel.setAggLoadRequest( internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean) carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", "")) - DataLoadingUtil.buildCarbonLoadModel( - table, - carbonProperty, - options, + new CarbonLoadModelBuilder(table).build( + options.asJava, optionsFinal, carbonLoadModel, hadoopConf, @@ -221,7 +223,7 @@ case class CarbonLoadDataCommand( carbonLoadModel, factPath, dataFrame.isDefined, - optionsFinal.asJava, + optionsFinal, options.asJava, isOverwriteTable) operationContext.setProperty("isOverwrite", isOverwriteTable) @@ -229,6 +231,7 @@ case class CarbonLoadDataCommand( // First system has to partition the data first and then call the load data LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") // Clean up the old invalid segment data before creating a new entry for new load. + SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false) DataLoadingUtil.deleteLoadsAndUpdateMetadata( isForceDeletion = false, table, http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala index a37d6dc..f074285 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala @@ -22,13 +22,13 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.carbondata.common.exceptions.ConcurrentOperationException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.events.{DeleteFromTablePostEvent, DeleteFromTablePreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.processing.loading.FailureCauses -import org.apache.carbondata.spark.exception.ConcurrentOperationException /** * IUD update delete and compaction framework. http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index 4886676..5165342 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.types.ArrayType import org.apache.spark.storage.StorageLevel +import org.apache.carbondata.common.exceptions.ConcurrentOperationException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment @@ -34,7 +35,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.{OperationContext, OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent} import org.apache.carbondata.processing.loading.FailureCauses -import org.apache.carbondata.spark.exception.ConcurrentOperationException private[sql] case class CarbonProjectForUpdateCommand( plan: LogicalPlan, http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala index 5817d88..220d75d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala @@ -23,9 +23,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.HiveSessionCatalog +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException /** * Util for IUD common function http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index 4d0a4c5..c836584 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -35,7 +35,6 @@ import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} -import org.apache.carbondata.spark.util.DataLoadingUtil /** * Below command class will be used to create pre-aggregate table @@ -179,11 +178,7 @@ case class CreatePreAggregateTableCommand( // This will be used to check if the parent table has any segments or not. If not then no // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT // table. - DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, - parentTable, - CarbonFilters.getCurrentPartitions(sparkSession, - TableIdentifier(parentTable.getTableName, - Some(parentTable.getDatabaseName))).map(_.asJava).orNull) + SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false) val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath) if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS || load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index ebf87d2..a5d256c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.spark.sql.types.DataType +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} @@ -42,7 +43,6 @@ import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchem import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.format.TableInfo -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CommonUtil /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index 9fde675..cf77e0f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -25,6 +25,8 @@ import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCo import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog} import org.apache.spark.util.AlterTableUtil +import org.apache.carbondata.common.exceptions.ConcurrentOperationException +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager @@ -37,7 +39,6 @@ import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format.SchemaEvolutionEntry -import org.apache.carbondata.spark.exception.{ConcurrentOperationException, MalformedCarbonCommandException} private[sql] case class CarbonAlterTableRenameCommand( alterTableRenameModel: AlterTableRenameModel) http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala index 45767da..2b35416 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala @@ -18,12 +18,12 @@ package org.apache.spark.sql.execution.command.timeseries import org.apache.spark.sql.execution.command.{DataMapField, Field} +import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException} import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES import org.apache.carbondata.core.metadata.schema.datamap.Granularity import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.preagg.TimeSeriesUDF -import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, MalformedDataMapCommandException} /** * Utility class for time series to keep http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala index b4d3bea..61a31a5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -50,6 +50,9 @@ import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutpu import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter import org.apache.carbondata.hadoop.internal.ObjectArrayWritable import org.apache.carbondata.hadoop.util.ObjectSerializationUtil +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable +import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} +import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util} import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util} @@ -87,7 +90,7 @@ with Serializable { TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession) val model = new CarbonLoadModel val carbonProperty = CarbonProperties.getInstance() - val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options) + val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava) val tableProperties = table.getTableInfo.getFactTable.getTableProperties optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope", carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, @@ -102,14 +105,11 @@ with Serializable { val optionsLocal = new mutable.HashMap[String, String]() optionsLocal ++= options optionsLocal += (("header", "false")) - DataLoadingUtil.buildCarbonLoadModel( - table, - carbonProperty, - optionsLocal.toMap, + new CarbonLoadModelBuilder(table).build( + optionsLocal.toMap.asJava, optionsFinal, model, - conf - ) + conf) model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean) model.setDictionaryServerHost(options.getOrElse("dicthost", null)) model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt) http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index dcbce84..ec20ec2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -32,10 +32,10 @@ import org.apache.spark.sql.execution.datasources.RefreshTable import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.util.{CarbonReflectionUtils, FileUtils} +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.processing.merger.CompactionType -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException /** * Carbon strategies for ddl commands http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala index 608ec60..7028dcf 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.command.AlterTableRenameCommand import org.apache.spark.sql.execution.command.mutation.{CarbonProjectForDeleteCommand, CarbonProjectForUpdateCommand} import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand} -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException /** * Strategy for streaming table, like blocking unsupported operation http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala index 7ca34af..27c7d17 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala @@ -24,9 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, SessionParams} -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException case class CarbonDropDatabaseCommand(command: DropDatabaseCommand) extends RunnableCommand { http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 86790ba..c858a8f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -36,9 +36,9 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.CarbonReflectionUtils +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.spark.CarbonOption -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CommonUtil /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index ad6d0c7..ef4836e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -32,10 +32,10 @@ import org.apache.spark.sql.types.StructField import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.CarbonReflectionUtils +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.hadoop.util.SchemaReader import org.apache.carbondata.spark.CarbonOption -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CommonUtil /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index bc36e9c..aaa87a3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog, HiveExternalCatalog} import org.apache.spark.sql.hive.HiveExternalCatalog._ +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory @@ -39,7 +40,6 @@ import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException object AlterTableUtil { http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala index bc62902..a8094b6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala @@ -19,10 +19,10 @@ package org.apache.spark.util import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException /** * table api util http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala index e82b485..f033a8e 100644 --- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -16,9 +16,6 @@ */ package org.apache.spark.sql.hive - -import scala.collection.generic.SeqFactory - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql._ @@ -46,10 +43,8 @@ import org.apache.spark.sql.types.DecimalType import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datamap.DataMapStoreManager -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CarbonScalaUtil /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala index c676b01..5ade510 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala @@ -4,7 +4,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException /** * Created by rahul on 19/9/17. http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala index 1d41ddc..3298009 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala @@ -25,7 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} import org.apache.carbondata.processing.util.TableOptionConstant /** @@ -63,7 +63,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll { CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) carbonLoadModel.setCsvHeaderColumns( - CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration)) + LoadOption.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration)) // Create table and metadata folders if not exist val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath) val fileType = FileFactory.getFileType(metadataDirectoryPath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala index 7ca0b56..43d8c03 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala @@ -23,14 +23,14 @@ import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.processing.exception.DataLoadingException -import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} import org.apache.carbondata.processing.util.TableOptionConstant -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException /** * test case for external column dictionary generation @@ -176,7 +176,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) carbonLoadModel.setCsvHeaderColumns( - CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration)) + LoadOption.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration)) carbonLoadModel.setMaxColumns("100") // Create table and metadata folders if not exist val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index 7dc6275..3b599fc 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -32,14 +32,12 @@ import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus} import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, ProcessMetaDataException} -import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.exception.ProcessMetaDataException class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala index 9da7244..65a006b 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala @@ -21,11 +21,11 @@ import org.apache.spark.sql.common.util.Spark2QueryTest import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala index ac10b9a..995f041 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala @@ -26,9 +26,9 @@ import org.apache.spark.sql.common.util.Spark2QueryTest import org.apache.spark.sql.test.TestQueryExecutor import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, ProcessMetaDataException} class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java index 24b25b9..bf1dfd1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java @@ -18,14 +18,16 @@ package org.apache.carbondata.processing.loading.model; import java.io.Serializable; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import org.apache.carbondata.core.dictionary.service.DictionaryServiceProvider; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; - +import org.apache.carbondata.core.util.path.CarbonTablePath; public class CarbonLoadModel implements Serializable { @@ -797,6 +799,7 @@ public class CarbonLoadModel implements Serializable { this.skipEmptyLine = skipEmptyLine; } + public boolean isPartitionLoad() { return isPartitionLoad; } @@ -812,4 +815,13 @@ public class CarbonLoadModel implements Serializable { public void setDataWritePath(String dataWritePath) { this.dataWritePath = dataWritePath; } + + /** + * Read segments metadata from table status file and set it to this load model object + */ + public void readAndSetLoadMetadataDetails() { + String metadataPath = CarbonTablePath.getMetadataPath(tablePath); + LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metadataPath); + setLoadMetadataDetails(Arrays.asList(details)); + } }
