http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 0064c21..f9f556d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -35,9 +35,10 @@ import org.apache.spark.util.FileUtils import org.codehaus.jackson.map.ObjectMapper import org.apache.carbondata.api.CarbonStore +import org.apache.carbondata.common.constants.LoggerAction import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree -import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.DictionaryServer import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} @@ -359,7 +360,8 @@ case class LoadTable( sys.error(s"Data loading failed. table not found: $dbName.$tableName") } - CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false") + val carbonProperty: CarbonProperties = CarbonProperties.getInstance() + carbonProperty.addProperty("zookeeper.enable.lock", "false") val carbonLock = CarbonLockFactory .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier .getCarbonTableIdentifier, @@ -407,31 +409,60 @@ case class LoadTable( val commentChar = options.getOrElse("commentchar", "#") val columnDict = options.getOrElse("columndict", null) val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N") - val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false") - val badRecordActionValue = CarbonProperties.getInstance() + val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", + carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE, + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)) + val badRecordActionValue = carbonProperty .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) - val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue) - val isEmptyDataBadRecord = options.getOrElse("is_empty_data_bad_record", "false") + val badRecordsAction = options.getOrElse("bad_records_action", carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, + badRecordActionValue)) + val isEmptyDataBadRecord = options.getOrElse("is_empty_data_bad_record", carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)) val allDictionaryPath = options.getOrElse("all_dictionary_path", "") val complex_delimiter_level_1 = options.getOrElse("complex_delimiter_level_1", "\\$") val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:") - val dateFormat = options.getOrElse("dateformat", null) + val dateFormat = options.getOrElse("dateformat", + carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, + CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)) ValidateUtil.validateDateFormat(dateFormat, table, tableName) val maxColumns = options.getOrElse("maxcolumns", null) - val sortScope = options.getOrElse("sort_scope", null) + val sortScope = options + .getOrElse("sort_scope", + carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, + carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, + CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))) ValidateUtil.validateSortScope(table, sortScope) - val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null) - val globalSortPartitions = options.getOrElse("global_sort_partitions", null) + val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", + carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB, + carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, + CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))) + val bad_record_path = options.getOrElse("bad_record_path", + carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, + carbonProperty.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))) + if (badRecordsLoggerEnable.toBoolean || + LoggerAction.REDIRECT.name().equalsIgnoreCase(badRecordsAction)) { + if (!CarbonUtil.isValidBadStorePath(bad_record_path)) { + sys.error("Invalid bad records location.") + } + } + carbonLoadModel.setBadRecordsLocation(bad_record_path) + val globalSortPartitions = options.getOrElse("global_sort_partitions", + carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null)) ValidateUtil.validateGlobalSortPartitions(globalSortPartitions) carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\")) carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\"")) carbonLoadModel.setCommentChar(checkDefaultValue(commentChar, "#")) carbonLoadModel.setDateFormat(dateFormat) - carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( + carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty( CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) - carbonLoadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty( + carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty( CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) carbonLoadModel @@ -449,7 +480,9 @@ case class LoadTable( carbonLoadModel.setSortScope(sortScope) carbonLoadModel.setBatchSortSizeInMb(batchSortSizeInMB) carbonLoadModel.setGlobalSortPartitions(globalSortPartitions) - val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match { + val useOnePass = options.getOrElse("single_pass", + carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS, + CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match { case "true" => true case "false" => @@ -534,7 +567,7 @@ case class LoadTable( allDictionaryPath) } // dictionaryServerClient dictionary generator - val dictionaryServerPort = CarbonProperties.getInstance() + val dictionaryServerPort = carbonProperty .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT, CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT) val sparkDriverHost = sparkSession.sqlContext.sparkContext. @@ -776,13 +809,6 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean, CarbonUtil.deleteFoldersAndFiles(file.getParentFile) } } - // delete bad record log after drop table - val badLogPath = CarbonUtil.getBadLogPath(dbName + File.separator + tableName) - val badLogFileType = FileFactory.getFileType(badLogPath) - if (FileFactory.isFileExist(badLogPath, badLogFileType)) { - val file = FileFactory.getCarbonFile(badLogPath, badLogFileType) - CarbonUtil.deleteFoldersAndFiles(file) - } } } Seq.empty
http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala new file mode 100644 index 0000000..51b29a1 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.internal + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.util.CarbonProperties + +/** + * To initialize dynamic values default param + */ +class CarbonSQLConf(sparkSession: SparkSession) { + + val carbonProperties = CarbonProperties.getInstance() + + /** + * To initialize dynamic param defaults along with usage docs + */ + def addDefaultCarbonParams(): Unit = { + val ENABLE_UNSAFE_SORT = + SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT) + .doc("To enable/ disable unsafe sort.") + .booleanConf + .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, + CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean) + val CARBON_CUSTOM_BLOCK_DISTRIBUTION = + SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION) + .doc("To enable/ disable carbon custom block distribution.") + .booleanConf + .createWithDefault(carbonProperties + .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, + CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean) + val BAD_RECORDS_LOGGER_ENABLE = + SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE) + .doc("To enable/ disable carbon bad record logger.") + .booleanConf + .createWithDefault(CarbonLoadOptionConstants + .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean) + val BAD_RECORDS_ACTION = + SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION) + .doc("To configure the bad records action.") + .stringConf + .createWithDefault(carbonProperties + .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)) + val IS_EMPTY_DATA_BAD_RECORD = + SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD) + .doc("Property to decide weather empty data to be considered bad/ good record.") + .booleanConf + .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT + .toBoolean) + val SORT_SCOPE = + SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE) + .doc("Property to specify sort scope.") + .stringConf + .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, + CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) + val BATCH_SORT_SIZE_INMB = + SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB) + .doc("Property to specify batch sort size in MB.") + .stringConf + .createWithDefault(carbonProperties + .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, + CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)) + val SINGLE_PASS = + SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS) + .doc("Property to enable/disable single_pass.") + .booleanConf + .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean) + val BAD_RECORD_PATH = + SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH) + .doc("Property to configure the bad record location.") + .stringConf + .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) + val GLOBAL_SORT_PARTITIONS = + SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS) + .doc("Property to configure the global sort partitions.") + .stringConf + .createWithDefault(carbonProperties + .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, + CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)) + val DATEFORMAT = + SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT) + .doc("Property to configure data format for date type columns.") + .stringConf + .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT) + } + + /** + * to set the dynamic properties default values + */ + def addDefaultCarbonSessionParams(): Unit = { + sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT, + carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, + CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean) + sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, + carbonProperties + .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, + CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE, + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, + carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, + carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, + CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB, + carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, + CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS, + CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, + carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, + carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, + carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, + CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, + CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT) + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index 3412fb0..41d6bd3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.SparkSqlAstBuilder import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, PartitionerField, TableModel} import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} -import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams} +import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo} import org.apache.carbondata.spark.CarbonOption import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CommonUtil @@ -43,8 +43,8 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Ab private val substitutor = new VariableSubstitution(conf) override def parsePlan(sqlText: String): LogicalPlan = { - val sessionParams : SessionParams = CarbonEnv.getInstance(sparkSession).sessionParams - ThreadLocalSessionParams.setSessionParams(sessionParams) + val carbonSessionInfo: CarbonSessionInfo = CarbonEnv.getInstance(sparkSession).carbonSessionInfo + ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) try { super.parsePlan(sqlText) } catch { http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala new file mode 100644 index 0000000..846c4b6 --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.carbondata + +import java.io.File + +import org.apache.spark.sql.common.util.QueryTest +import org.apache.spark.sql.hive.HiveContext +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonProperties + +/** + * Test Class for detailed query on timestamp datatypes + * + * + */ +class BadRecordPathLoadOptionTest extends QueryTest with BeforeAndAfterAll { + var hiveContext: HiveContext = _ + var badRecordPath: String = null + override def beforeAll { + try { + badRecordPath = new File("./target/test/badRecords") + .getCanonicalPath.replaceAll("\\\\","/") + sql("drop table IF EXISTS salestest") + } + } + + test("data load log file and csv file written at the configured location") { + sql( + """CREATE TABLE IF NOT EXISTS salestest(ID BigInt, date Timestamp, country String, + actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + val csvFilePath = s"$resourcesPath/badrecords/datasample.csv" + sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH}=${badRecordPath}") + sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE salestest OPTIONS" + + "('bad_records_logger_enable'='true','bad_records_action'='redirect', 'DELIMITER'=" + + " ',', 'QUOTECHAR'= '\"')") + val location: Boolean = isFilesWrittenAtBadStoreLocation + assert(location) + } + + override def afterAll { + sql("drop table salestest") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") + } + + def isFilesWrittenAtBadStoreLocation: Boolean = { + val badStorePath = badRecordPath + "/default/salestest/0/0" + val carbonFile: CarbonFile = FileFactory + .getCarbonFile(badStorePath, FileFactory.getFileType(badStorePath)) + var exists: Boolean = carbonFile.exists() + if (exists) { + val listFiles: Array[CarbonFile] = carbonFile.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + if (file.getName.endsWith(".log") || file.getName.endsWith(".csv")) { + return true; + } + return false; + } + }) + exists = listFiles.size > 0 + } + return exists; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala index 5e91574..6f57cd6 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala @@ -238,7 +238,6 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll { CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) } } - // override def afterAll { sql("drop table IF EXISTS data_pm") http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala new file mode 100644 index 0000000..18b4039 --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.carbondata.commands + +import org.apache.spark.sql.common.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants +import org.apache.carbondata.core.exception.InvalidConfigurationException + +class SetCommandTestCase extends QueryTest with BeforeAndAfterAll{ + override def beforeAll: Unit = { + sql("set carbon=true") + } + test("test set command") { + checkAnswer(sql("set"), sql("set")) + } + + test("test set any value command") { + checkAnswer(sql("set carbon=false"), sql("set carbon")) + } + + test("test set command for enable.unsafe.sort=true") { + checkAnswer(sql("set enable.unsafe.sort=true"), sql("set enable.unsafe.sort")) + } + + test("test set command for enable.unsafe.sort for invalid option") { + try { + checkAnswer(sql("set enable.unsafe.sort=123"), sql("set enable.unsafe.sort")) + assert(false) + } catch { + case ex: InvalidConfigurationException => + assert(true) + } + } + //is_empty_data_bad_record + test(s"test set command for" + + s" ${ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE }=true") { + checkAnswer(sql(s"set ${ + CarbonLoadOptionConstants + .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE + }=true"), sql(s"set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE }")) + } + + test(s"test set command for ${ + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE} for invalid option") { + try { + checkAnswer(sql(s"set ${ + CarbonLoadOptionConstants + .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE + }=123"), sql(s"set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE }")) + assert(false) + } catch { + case ex: InvalidConfigurationException => + assert(true) + } + } + test(s"test set command for ${ + CarbonLoadOptionConstants + .CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD + }=true") { + checkAnswer(sql(s"set ${ + CarbonLoadOptionConstants + .CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD + }=true"), + sql(s"set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD }")) + } + + test(s"test set command for ${CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD} " + + s"for invalid option") { + try { + checkAnswer( + sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD}=123"), + sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD}")) + assert(false) + } catch { + case ex: InvalidConfigurationException => + assert(true) + } + } + //carbon.custom.block.distribution + test("test set command for carbon.custom.block.distribution=true") { + checkAnswer(sql("set carbon.custom.block.distribution=true"), + sql("set carbon.custom.block.distribution")) + } + + test("test set command for carbon.custom.block.distribution for invalid option") { + try { + checkAnswer(sql("set carbon.custom.block.distribution=123"), + sql("set carbon.custom.block.distribution")) + assert(false) + } catch { + case ex: InvalidConfigurationException => + assert(true) + } + } + // sort_scope + test(s"test set command for ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}=LOCAL_SORT") { + checkAnswer(sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}=LOCAL_SORT"), + sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}")) + } + + test(s"test set command for ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE} for invalid option") { + try { + checkAnswer(sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}=123"), + sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}")) + assert(false) + } catch { + case ex: InvalidConfigurationException => + assert(true) + } + } + // batch_sort_size_inmb + test(s"test set command for ${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}=4") { + checkAnswer(sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}=4"), + sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}")) + } + + test(s"test set ${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB} for invalid option") { + try { + checkAnswer(sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}=hjf"), + sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}")) + assert(false) + } catch { + case ex: InvalidConfigurationException => + assert(true) + } + } + // single_pass + test(s"test set command for ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}=true") { + checkAnswer(sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}=true"), + sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}")) + } + + test(s"test set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS} for invalid option") { + try { + checkAnswer(sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}=123"), + sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}")) + assert(false) + } catch { + case ex: InvalidConfigurationException => + assert(true) + } + } + override def afterAll { + sql("reset") + sql("set carbon=true") + checkAnswer(sql("set carbon"), + sql("set")) + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java b/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java deleted file mode 100644 index 901df3b..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.constants; - -/** - * enum to hold the bad record logger action - */ -public enum LoggerAction { - - FORCE("FORCE"), // data will be converted to null - REDIRECT("REDIRECT"), // no null conversion moved to bad record and written to raw csv - IGNORE("IGNORE"), // no null conversion moved to bad record and not written to raw csv - FAIL("FAIL"); //data loading will fail if a bad record is found - private String name; - - LoggerAction(String name) { - this.name = name; - } - - @Override public String toString() { - return this.name; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java index 7ec7933..bfc1be9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java @@ -158,6 +158,10 @@ public class CarbonLoadModel implements Serializable { * Batch sort size in mb. */ private String batchSortSizeInMb; + /** + * bad record location + */ + private String badRecordsLocation; /** * Number of partitions in global sort. @@ -363,6 +367,7 @@ public class CarbonLoadModel implements Serializable { copy.isEmptyDataBadRecord = isEmptyDataBadRecord; copy.sortScope = sortScope; copy.batchSortSizeInMb = batchSortSizeInMb; + copy.badRecordsLocation = badRecordsLocation; return copy; } @@ -464,6 +469,7 @@ public class CarbonLoadModel implements Serializable { copyObj.isEmptyDataBadRecord = isEmptyDataBadRecord; copyObj.sortScope = sortScope; copyObj.batchSortSizeInMb = batchSortSizeInMb; + copyObj.badRecordsLocation = badRecordsLocation; return copyObj; } @@ -764,4 +770,12 @@ public class CarbonLoadModel implements Serializable { public void setGlobalSortPartitions(String globalSortPartitions) { this.globalSortPartitions = globalSortPartitions; } + + public String getBadRecordsLocation() { + return badRecordsLocation; + } + + public void setBadRecordsLocation(String badRecordsLocation) { + this.badRecordsLocation = badRecordsLocation; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java index 3294d5f..5662a04 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java @@ -26,6 +26,7 @@ import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; import org.apache.carbondata.core.datastore.TableSpec; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.CarbonMetadata; @@ -180,6 +181,8 @@ public final class DataLoadProcessBuilder { loadModel.getBatchSortSizeInMb()); configuration.setDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, loadModel.getGlobalSortPartitions()); + configuration.setDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, + loadModel.getBadRecordsLocation()); CarbonMetadata.getInstance().addCarbonTable(carbonTable); List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java index 1cc043f..2bf8e16 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java @@ -18,6 +18,7 @@ package org.apache.carbondata.processing.newflow.sort; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonUtil; /** * Sort scope options @@ -43,21 +44,7 @@ public class SortScopeOptions { } public static boolean isValidSortOption(String sortScope) { - if (sortScope == null) { - return false; - } - switch (sortScope.toUpperCase()) { - case "BATCH_SORT": - return true; - case "LOCAL_SORT": - return true; - case "GLOBAL_SORT": - return true; - case "NO_SORT": - return true; - default: - return false; - } + return CarbonUtil.isValidSortOption(sortScope); } public enum SortScope { http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java index 000d0b9..62d6c94 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java @@ -24,11 +24,12 @@ import java.util.Iterator; import java.util.List; import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.constants.LoggerAction; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.processing.constants.LoggerAction; import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep; import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration; import org.apache.carbondata.processing.newflow.DataField; @@ -152,16 +153,22 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte CarbonTableIdentifier identifier = configuration.getTableIdentifier().getCarbonTableIdentifier(); return new BadRecordsLogger(identifier.getBadRecordLoggerKey(), - identifier.getTableName() + '_' + System.currentTimeMillis(), getBadLogStoreLocation( - identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier - .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId() - + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()), + identifier.getTableName() + '_' + System.currentTimeMillis(), + getBadLogStoreLocation(configuration, + identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier + .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration + .getSegmentId() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()), badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail); } - public static String getBadLogStoreLocation(String storeLocation) { - String badLogStoreLocation = - CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC); + public static String getBadLogStoreLocation(CarbonDataLoadConfiguration configuration, + String storeLocation) { + String badLogStoreLocation = (String) configuration + .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH); + if (null == badLogStoreLocation) { + badLogStoreLocation = + CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC); + } badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation; return badLogStoreLocation; @@ -198,7 +205,7 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte // rename the bad record in progress to normal CarbonTableIdentifier identifier = configuration.getTableIdentifier().getCarbonTableIdentifier(); - CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal( + CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(configuration, identifier.getDatabaseName() + File.separator + identifier.getTableName() + File.separator + configuration.getSegmentId() + File.separator + configuration .getTaskNo()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java index d6185ba..c6f83ed 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java @@ -24,13 +24,14 @@ import java.util.Iterator; import java.util.List; import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.constants.LoggerAction; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.schema.BucketingInfo; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.processing.constants.LoggerAction; import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep; import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration; import org.apache.carbondata.processing.newflow.DataField; @@ -41,6 +42,7 @@ import org.apache.carbondata.processing.newflow.partition.Partitioner; import org.apache.carbondata.processing.newflow.partition.impl.HashPartitionerImpl; import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; /** * Replace row data fields with dictionary values if column is configured dictionary encoded. @@ -187,8 +189,12 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa } private String getBadLogStoreLocation(String storeLocation) { - String badLogStoreLocation = - CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC); + String badLogStoreLocation = (String) configuration + .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH); + if (null == badLogStoreLocation) { + badLogStoreLocation = + CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC); + } badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation; return badLogStoreLocation; @@ -200,6 +206,7 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa super.close(); if (null != badRecordLogger) { badRecordLogger.closeStreams(); + renameBadRecord(configuration); } if (converters != null) { for (RowConverter converter : converters) { @@ -208,7 +215,15 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa } } } - + private static void renameBadRecord(CarbonDataLoadConfiguration configuration) { + // rename the bad record in progress to normal + CarbonTableIdentifier identifier = + configuration.getTableIdentifier().getCarbonTableIdentifier(); + CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(configuration, + identifier.getDatabaseName() + File.separator + identifier.getTableName() + + File.separator + configuration.getSegmentId() + File.separator + configuration + .getTaskNo()); + } @Override protected String getStepName() { return "Data Converter with Bucketing"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index 84e1f20..62f13db 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -32,6 +32,7 @@ import java.util.Set; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; import org.apache.carbondata.core.datastore.DimensionType; import org.apache.carbondata.core.datastore.GenericDataType; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; @@ -90,12 +91,18 @@ public final class CarbonDataProcessorUtil { } /** + * @param configuration * @param storeLocation */ - public static void renameBadRecordsFromInProgressToNormal(String storeLocation) { + public static void renameBadRecordsFromInProgressToNormal( + CarbonDataLoadConfiguration configuration, String storeLocation) { // get the base store location - String badLogStoreLocation = - CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC); + String badLogStoreLocation = (String) configuration + .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH); + if (null == badLogStoreLocation) { + badLogStoreLocation = + CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC); + } badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation; FileType fileType = FileFactory.getFileType(badLogStoreLocation); @@ -466,7 +473,8 @@ public final class CarbonDataProcessorUtil { if (configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB) == null) { batchSortSizeInMb = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, "0")); + .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, + CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)); } else { batchSortSizeInMb = Integer.parseInt( configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB) http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java index d5a4f02..fdbd2f8 100644 --- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java +++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java @@ -59,8 +59,9 @@ public class BlockIndexStoreTest extends TestCase { @BeforeClass public void setUp() { property = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION); - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "1"); + CarbonProperties.getInstance(). + addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords"); StoreCreator.createCarbonStore(); CarbonProperties.getInstance(). addProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE, "10");
