http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 0064c21..f9f556d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -35,9 +35,10 @@ import org.apache.spark.util.FileUtils
 import org.codehaus.jackson.map.ObjectMapper
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
@@ -359,7 +360,8 @@ case class LoadTable(
       sys.error(s"Data loading failed. table not found: $dbName.$tableName")
     }
 
-    CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", 
"false")
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
     val carbonLock = CarbonLockFactory
       
.getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
         .getCarbonTableIdentifier,
@@ -407,31 +409,60 @@ case class LoadTable(
       val commentChar = options.getOrElse("commentchar", "#")
       val columnDict = options.getOrElse("columndict", null)
       val serializationNullFormat = 
options.getOrElse("serialization_null_format", "\\N")
-      val badRecordsLoggerEnable = 
options.getOrElse("bad_records_logger_enable", "false")
-      val badRecordActionValue = CarbonProperties.getInstance()
+      val badRecordsLoggerEnable = 
options.getOrElse("bad_records_logger_enable",
+        carbonProperty
+          
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+            
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT))
+      val badRecordActionValue = carbonProperty
         .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
           CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
-      val badRecordsAction = options.getOrElse("bad_records_action", 
badRecordActionValue)
-      val isEmptyDataBadRecord = options.getOrElse("is_empty_data_bad_record", 
"false")
+      val badRecordsAction = options.getOrElse("bad_records_action", 
carbonProperty
+        
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+          badRecordActionValue))
+      val isEmptyDataBadRecord = options.getOrElse("is_empty_data_bad_record", 
carbonProperty
+        
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+          
CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT))
       val allDictionaryPath = options.getOrElse("all_dictionary_path", "")
       val complex_delimiter_level_1 = 
options.getOrElse("complex_delimiter_level_1", "\\$")
       val complex_delimiter_level_2 = 
options.getOrElse("complex_delimiter_level_2", "\\:")
-      val dateFormat = options.getOrElse("dateformat", null)
+      val dateFormat = options.getOrElse("dateformat",
+        
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+          CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT))
       ValidateUtil.validateDateFormat(dateFormat, table, tableName)
       val maxColumns = options.getOrElse("maxcolumns", null)
-      val sortScope = options.getOrElse("sort_scope", null)
+      val sortScope = options
+        .getOrElse("sort_scope",
+          
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+            carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+              CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)))
       ValidateUtil.validateSortScope(table, sortScope)
-      val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null)
-      val globalSortPartitions = options.getOrElse("global_sort_partitions", 
null)
+      val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb",
+        
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+          
carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)))
+      val bad_record_path = options.getOrElse("bad_record_path",
+        
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+          
carbonProperty.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+            CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)))
+      if (badRecordsLoggerEnable.toBoolean ||
+          LoggerAction.REDIRECT.name().equalsIgnoreCase(badRecordsAction)) {
+        if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
+          sys.error("Invalid bad records location.")
+        }
+      }
+      carbonLoadModel.setBadRecordsLocation(bad_record_path)
+      val globalSortPartitions = options.getOrElse("global_sort_partitions",
+        carbonProperty
+          
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, 
null))
       ValidateUtil.validateGlobalSortPartitions(globalSortPartitions)
       carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\"))
       carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
       carbonLoadModel.setCommentChar(checkDefaultValue(commentChar, "#"))
       carbonLoadModel.setDateFormat(dateFormat)
-      
carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
+      carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
         CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
         CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
-      
carbonLoadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
+      carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty(
         CarbonCommonConstants.CARBON_DATE_FORMAT,
         CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
       carbonLoadModel
@@ -449,7 +480,9 @@ case class LoadTable(
       carbonLoadModel.setSortScope(sortScope)
       carbonLoadModel.setBatchSortSizeInMb(batchSortSizeInMB)
       carbonLoadModel.setGlobalSortPartitions(globalSortPartitions)
-      val useOnePass = options.getOrElse("single_pass", 
"false").trim.toLowerCase match {
+      val useOnePass = options.getOrElse("single_pass",
+        
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+          
CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase 
match {
         case "true" =>
           true
         case "false" =>
@@ -534,7 +567,7 @@ case class LoadTable(
                 allDictionaryPath)
           }
           // dictionaryServerClient dictionary generator
-          val dictionaryServerPort = CarbonProperties.getInstance()
+          val dictionaryServerPort = carbonProperty
             .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
               CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
           val sparkDriverHost = sparkSession.sqlContext.sparkContext.
@@ -776,13 +809,6 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
             CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
           }
         }
-        // delete bad record log after drop table
-        val badLogPath = CarbonUtil.getBadLogPath(dbName + File.separator + 
tableName)
-        val badLogFileType = FileFactory.getFileType(badLogPath)
-        if (FileFactory.isFileExist(badLogPath, badLogFileType)) {
-          val file = FileFactory.getCarbonFile(badLogPath, badLogFileType)
-          CarbonUtil.deleteFoldersAndFiles(file)
-        }
       }
     }
     Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
new file mode 100644
index 0000000..51b29a1
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.internal
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonLoadOptionConstants}
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * To initialize dynamic values default param
+ */
+class CarbonSQLConf(sparkSession: SparkSession) {
+
+  val carbonProperties = CarbonProperties.getInstance()
+
+  /**
+   * To initialize dynamic param defaults along with usage docs
+   */
+  def addDefaultCarbonParams(): Unit = {
+    val ENABLE_UNSAFE_SORT =
+      SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
+        .doc("To enable/ disable unsafe sort.")
+        .booleanConf
+        
.createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+    val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
+      SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
+        .doc("To enable/ disable carbon custom block distribution.")
+        .booleanConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+            
CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
+    val BAD_RECORDS_LOGGER_ENABLE =
+      
SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
+        .doc("To enable/ disable carbon bad record logger.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants
+          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+    val BAD_RECORDS_ACTION =
+      
SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
+        .doc("To configure the bad records action.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+    val IS_EMPTY_DATA_BAD_RECORD =
+      
SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
+        .doc("Property to decide weather empty data to be considered bad/ good 
record.")
+        .booleanConf
+        
.createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
+          .toBoolean)
+    val SORT_SCOPE =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
+        .doc("Property to specify sort scope.")
+        .stringConf
+        
.createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    val BATCH_SORT_SIZE_INMB =
+      
SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
+        .doc("Property to specify batch sort size in MB.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+    val SINGLE_PASS =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
+        .doc("Property to enable/disable single_pass.")
+        .booleanConf
+        
.createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+    val BAD_RECORD_PATH =
+      
SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
+        .doc("Property to configure the bad record location.")
+        .stringConf
+        
.createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    val GLOBAL_SORT_PARTITIONS =
+      
SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
+        .doc("Property to configure the global sort partitions.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+            CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+    val DATEFORMAT =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
+        .doc("Property to configure data format for date type columns.")
+        .stringConf
+        
.createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+  }
+
+  /**
+   * to set the dynamic properties default values
+   */
+  def addDefaultCarbonSessionParams(): Unit = {
+    sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+      carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+        CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+    
sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+      carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+          
CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
+    
sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+      
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+    
sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+      
carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+    
sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+      
CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    
sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+      
carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+        CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+    
sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    
sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    
sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+      
carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+        CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 3412fb0..41d6bd3 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.SparkSqlAstBuilder
 import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, 
Field, PartitionerField, TableModel}
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 
-import org.apache.carbondata.core.util.{SessionParams, 
ThreadLocalSessionParams}
+import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, 
ThreadLocalSessionInfo}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
@@ -43,8 +43,8 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: 
SparkSession) extends Ab
   private val substitutor = new VariableSubstitution(conf)
 
   override def parsePlan(sqlText: String): LogicalPlan = {
-    val sessionParams : SessionParams = 
CarbonEnv.getInstance(sparkSession).sessionParams
-    ThreadLocalSessionParams.setSessionParams(sessionParams)
+    val carbonSessionInfo: CarbonSessionInfo = 
CarbonEnv.getInstance(sparkSession).carbonSessionInfo
+    ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
     try {
       super.parsePlan(sqlText)
     } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala
new file mode 100644
index 0000000..846c4b6
--- /dev/null
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata
+
+import java.io.File
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.hive.HiveContext
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonLoadOptionConstants}
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * Test Class for detailed query on timestamp datatypes
+ *
+ *
+ */
+class BadRecordPathLoadOptionTest extends QueryTest with BeforeAndAfterAll {
+  var hiveContext: HiveContext = _
+  var badRecordPath: String = null
+  override def beforeAll {
+    try {
+       badRecordPath = new File("./target/test/badRecords")
+        .getCanonicalPath.replaceAll("\\\\","/")
+      sql("drop table IF EXISTS salestest")
+    }
+  }
+
+  test("data load log file and csv file written at the configured location") {
+    sql(
+      """CREATE TABLE IF NOT EXISTS salestest(ID BigInt, date Timestamp, 
country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED 
BY 'carbondata'""")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
+    sql(s"set 
${CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH}=${badRecordPath}")
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE salestest 
OPTIONS" +
+        "('bad_records_logger_enable'='true','bad_records_action'='redirect', 
'DELIMITER'=" +
+        " ',', 'QUOTECHAR'= '\"')")
+    val location: Boolean = isFilesWrittenAtBadStoreLocation
+    assert(location)
+  }
+
+  override def afterAll {
+    sql("drop table salestest")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+  }
+
+  def isFilesWrittenAtBadStoreLocation: Boolean = {
+    val badStorePath = badRecordPath + "/default/salestest/0/0"
+    val carbonFile: CarbonFile = FileFactory
+      .getCarbonFile(badStorePath, FileFactory.getFileType(badStorePath))
+    var exists: Boolean = carbonFile.exists()
+    if (exists) {
+      val listFiles: Array[CarbonFile] = carbonFile.listFiles(new 
CarbonFileFilter {
+        override def accept(file: CarbonFile): Boolean = {
+          if (file.getName.endsWith(".log") || file.getName.endsWith(".csv")) {
+            return true;
+          }
+          return false;
+        }
+      })
+      exists = listFiles.size > 0
+    }
+    return exists;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
index 5e91574..6f57cd6 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
@@ -238,7 +238,6 @@ class DataLoadFailAllTypeSortTest extends QueryTest with 
BeforeAndAfterAll {
           CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
     }
   }
-
   //
   override def afterAll {
     sql("drop table IF EXISTS data_pm")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala
new file mode 100644
index 0000000..18b4039
--- /dev/null
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.carbondata.commands
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants
+import org.apache.carbondata.core.exception.InvalidConfigurationException
+
+class SetCommandTestCase extends QueryTest with BeforeAndAfterAll{
+  override def beforeAll: Unit = {
+    sql("set carbon=true")
+  }
+  test("test set command") {
+    checkAnswer(sql("set"), sql("set"))
+  }
+
+  test("test set any value command") {
+    checkAnswer(sql("set carbon=false"), sql("set carbon"))
+  }
+
+  test("test set command for enable.unsafe.sort=true") {
+    checkAnswer(sql("set enable.unsafe.sort=true"), sql("set 
enable.unsafe.sort"))
+  }
+
+  test("test set command for enable.unsafe.sort for invalid option") {
+    try {
+      checkAnswer(sql("set enable.unsafe.sort=123"), sql("set 
enable.unsafe.sort"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  //is_empty_data_bad_record
+  test(s"test set command for" +
+       s" ${ 
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE }=true") {
+    checkAnswer(sql(s"set ${
+      CarbonLoadOptionConstants
+        .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE
+    }=true"), sql(s"set ${ 
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE }"))
+  }
+
+  test(s"test set command for ${
+    CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE} for 
invalid option") {
+    try {
+      checkAnswer(sql(s"set ${
+        CarbonLoadOptionConstants
+          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE
+      }=123"), sql(s"set ${ 
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE }"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  test(s"test set command for ${
+    CarbonLoadOptionConstants
+      .CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD
+  }=true") {
+    checkAnswer(sql(s"set ${
+      CarbonLoadOptionConstants
+        .CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD
+    }=true"),
+      sql(s"set ${ 
CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD }"))
+  }
+
+  test(s"test set command for 
${CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD} " +
+       s"for invalid option") {
+    try {
+      checkAnswer(
+        sql(s"set 
${CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD}=123"),
+        sql(s"set 
${CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD}"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  //carbon.custom.block.distribution
+  test("test set command for carbon.custom.block.distribution=true") {
+    checkAnswer(sql("set carbon.custom.block.distribution=true"),
+      sql("set carbon.custom.block.distribution"))
+  }
+
+  test("test set command for carbon.custom.block.distribution for invalid 
option") {
+    try {
+      checkAnswer(sql("set carbon.custom.block.distribution=123"),
+        sql("set carbon.custom.block.distribution"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  // sort_scope
+  test(s"test set command for 
${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}=LOCAL_SORT") {
+    checkAnswer(sql(s"set 
${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}=LOCAL_SORT"),
+      sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}"))
+  }
+
+  test(s"test set command for 
${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE} for invalid option") {
+    try {
+      checkAnswer(sql(s"set 
${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}=123"),
+        sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  // batch_sort_size_inmb
+  test(s"test set command for 
${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}=4") {
+    checkAnswer(sql(s"set 
${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}=4"),
+      sql(s"set 
${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}"))
+  }
+
+  test(s"test set 
${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB} for invalid 
option") {
+    try {
+      checkAnswer(sql(s"set 
${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}=hjf"),
+        sql(s"set 
${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  // single_pass
+  test(s"test set command for 
${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}=true") {
+    checkAnswer(sql(s"set 
${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}=true"),
+      sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}"))
+  }
+
+  test(s"test set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS} for 
invalid option") {
+    try {
+      checkAnswer(sql(s"set 
${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}=123"),
+        sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  override def afterAll {
+    sql("reset")
+    sql("set carbon=true")
+    checkAnswer(sql("set carbon"),
+      sql("set"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java
 
b/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java
deleted file mode 100644
index 901df3b..0000000
--- 
a/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.constants;
-
-/**
- * enum to hold the bad record logger action
- */
-public enum LoggerAction {
-
-  FORCE("FORCE"), // data will be converted to null
-  REDIRECT("REDIRECT"), // no null conversion moved to bad record and written 
to raw csv
-  IGNORE("IGNORE"), // no null conversion moved to bad record and not written 
to raw csv
-  FAIL("FAIL");  //data loading will fail if a bad record is found
-  private String name;
-
-  LoggerAction(String name) {
-    this.name = name;
-  }
-
-  @Override public String toString() {
-    return this.name;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index 7ec7933..bfc1be9 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -158,6 +158,10 @@ public class CarbonLoadModel implements Serializable {
    * Batch sort size in mb.
    */
   private String batchSortSizeInMb;
+  /**
+   * bad record location
+   */
+  private String badRecordsLocation;
 
   /**
    * Number of partitions in global sort.
@@ -363,6 +367,7 @@ public class CarbonLoadModel implements Serializable {
     copy.isEmptyDataBadRecord = isEmptyDataBadRecord;
     copy.sortScope = sortScope;
     copy.batchSortSizeInMb = batchSortSizeInMb;
+    copy.badRecordsLocation = badRecordsLocation;
     return copy;
   }
 
@@ -464,6 +469,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.isEmptyDataBadRecord = isEmptyDataBadRecord;
     copyObj.sortScope = sortScope;
     copyObj.batchSortSizeInMb = batchSortSizeInMb;
+    copyObj.badRecordsLocation = badRecordsLocation;
     return copyObj;
   }
 
@@ -764,4 +770,12 @@ public class CarbonLoadModel implements Serializable {
   public void setGlobalSortPartitions(String globalSortPartitions) {
     this.globalSortPartitions = globalSortPartitions;
   }
+
+  public String getBadRecordsLocation() {
+    return badRecordsLocation;
+  }
+
+  public void setBadRecordsLocation(String badRecordsLocation) {
+    this.badRecordsLocation = badRecordsLocation;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 3294d5f..5662a04 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
@@ -180,6 +181,8 @@ public final class DataLoadProcessBuilder {
         loadModel.getBatchSortSizeInMb());
     
configuration.setDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
         loadModel.getGlobalSortPartitions());
+    
configuration.setDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+        loadModel.getBadRecordsLocation());
     CarbonMetadata.getInstance().addCarbonTable(carbonTable);
     List<CarbonDimension> dimensions =
         carbonTable.getDimensionByTableName(carbonTable.getFactTableName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
index 1cc043f..2bf8e16 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.processing.newflow.sort;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * Sort scope options
@@ -43,21 +44,7 @@ public class SortScopeOptions {
   }
 
   public static boolean isValidSortOption(String sortScope) {
-    if (sortScope == null) {
-      return false;
-    }
-    switch (sortScope.toUpperCase()) {
-      case "BATCH_SORT":
-        return true;
-      case "LOCAL_SORT":
-        return true;
-      case "GLOBAL_SORT":
-        return true;
-      case "NO_SORT":
-        return true;
-      default:
-        return false;
-    }
+    return CarbonUtil.isValidSortOption(sortScope);
   }
 
   public enum SortScope {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
index 000d0b9..62d6c94 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
@@ -24,11 +24,12 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.constants.LoggerAction;
 import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.DataField;
@@ -152,16 +153,22 @@ public class DataConverterProcessorStepImpl extends 
AbstractDataLoadProcessorSte
     CarbonTableIdentifier identifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();
     return new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
-        identifier.getTableName() + '_' + System.currentTimeMillis(), 
getBadLogStoreLocation(
-        identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + 
identifier
-            .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + 
configuration.getSegmentId()
-            + CarbonCommonConstants.FILE_SEPARATOR + 
configuration.getTaskNo()),
+        identifier.getTableName() + '_' + System.currentTimeMillis(),
+        getBadLogStoreLocation(configuration,
+            identifier.getDatabaseName() + 
CarbonCommonConstants.FILE_SEPARATOR + identifier
+                .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + 
configuration
+                .getSegmentId() + CarbonCommonConstants.FILE_SEPARATOR + 
configuration.getTaskNo()),
         badRecordsLogRedirect, badRecordsLoggerEnable, 
badRecordConvertNullDisable, isDataLoadFail);
   }
 
-  public static String getBadLogStoreLocation(String storeLocation) {
-    String badLogStoreLocation =
-        
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+  public static String getBadLogStoreLocation(CarbonDataLoadConfiguration 
configuration,
+      String storeLocation) {
+    String badLogStoreLocation = (String) configuration
+        
.getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
+    if (null == badLogStoreLocation) {
+      badLogStoreLocation =
+          
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    }
     badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
 
     return badLogStoreLocation;
@@ -198,7 +205,7 @@ public class DataConverterProcessorStepImpl extends 
AbstractDataLoadProcessorSte
     // rename the bad record in progress to normal
     CarbonTableIdentifier identifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();
-    CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(
+    
CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(configuration,
         identifier.getDatabaseName() + File.separator + 
identifier.getTableName()
             + File.separator + configuration.getSegmentId() + File.separator + 
configuration
             .getTaskNo());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
index d6185ba..c6f83ed 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
@@ -24,13 +24,14 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.constants.LoggerAction;
 import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.DataField;
@@ -41,6 +42,7 @@ import 
org.apache.carbondata.processing.newflow.partition.Partitioner;
 import 
org.apache.carbondata.processing.newflow.partition.impl.HashPartitionerImpl;
 import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
 import 
org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 /**
  * Replace row data fields with dictionary values if column is configured 
dictionary encoded.
@@ -187,8 +189,12 @@ public class DataConverterProcessorWithBucketingStepImpl 
extends AbstractDataLoa
   }
 
   private String getBadLogStoreLocation(String storeLocation) {
-    String badLogStoreLocation =
-        
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    String badLogStoreLocation = (String) configuration
+        
.getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
+    if (null == badLogStoreLocation) {
+      badLogStoreLocation =
+          
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    }
     badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
 
     return badLogStoreLocation;
@@ -200,6 +206,7 @@ public class DataConverterProcessorWithBucketingStepImpl 
extends AbstractDataLoa
       super.close();
       if (null != badRecordLogger) {
         badRecordLogger.closeStreams();
+        renameBadRecord(configuration);
       }
       if (converters != null) {
         for (RowConverter converter : converters) {
@@ -208,7 +215,15 @@ public class DataConverterProcessorWithBucketingStepImpl 
extends AbstractDataLoa
       }
     }
   }
-
+  private static void renameBadRecord(CarbonDataLoadConfiguration 
configuration) {
+    // rename the bad record in progress to normal
+    CarbonTableIdentifier identifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    
CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(configuration,
+        identifier.getDatabaseName() + File.separator + 
identifier.getTableName()
+            + File.separator + configuration.getSegmentId() + File.separator + 
configuration
+            .getTaskNo());
+  }
   @Override protected String getStepName() {
     return "Data Converter with Bucketing";
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 84e1f20..62f13db 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.DimensionType;
 import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
@@ -90,12 +91,18 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
+   * @param configuration
    * @param storeLocation
    */
-  public static void renameBadRecordsFromInProgressToNormal(String 
storeLocation) {
+  public static void renameBadRecordsFromInProgressToNormal(
+      CarbonDataLoadConfiguration configuration, String storeLocation) {
     // get the base store location
-    String badLogStoreLocation =
-        
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    String badLogStoreLocation = (String) configuration
+        
.getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
+    if (null == badLogStoreLocation) {
+      badLogStoreLocation =
+          
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    }
     badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
 
     FileType fileType = FileFactory.getFileType(badLogStoreLocation);
@@ -466,7 +473,8 @@ public final class CarbonDataProcessorUtil {
       if 
(configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB)
           == null) {
         batchSortSizeInMb = Integer.parseInt(CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, 
"0"));
+            .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+                CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT));
       } else {
         batchSortSizeInMb = Integer.parseInt(
             
configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git 
a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
 
b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index d5a4f02..fdbd2f8 100644
--- 
a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ 
b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -59,8 +59,9 @@ public class BlockIndexStoreTest extends TestCase {
 
   @BeforeClass public void setUp() {
        property = 
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
-       
        
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
 "1");
+    CarbonProperties.getInstance().
+        addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, 
"/tmp/carbon/badrecords");
     StoreCreator.createCarbonStore();
     CarbonProperties.getInstance().
         addProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE, 
"10");

Reply via email to