Repository: carbondata
Updated Branches:
  refs/heads/master b86ff926d -> b7b8073d6


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 3f86ca4..29d91d6 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -153,9 +153,7 @@ case class CarbonLoadDataCommand(
     } else {
       null
     }
-    if (table.getTableInfo.isUnManagedTable) {
-      throw new MalformedCarbonCommandException("Unsupported operation on 
unmanaged table")
-    }
+
     // get the value of 'spark.executor.cores' from spark conf, default value 
is 1
     val sparkExecutorCores = 
sparkSession.sparkContext.conf.get("spark.executor.cores", "1")
     // get the value of 'carbon.number.of.cores.while.loading' from carbon 
properties,
@@ -192,6 +190,7 @@ case class CarbonLoadDataCommand(
         FileUtils.getPaths(factPathFromUser, hadoopConf)
       }
       carbonLoadModel.setFactFilePath(factPath)
+      
carbonLoadModel.setCarbonTransactionalTable(table.getTableInfo.isTransactionalTable)
       carbonLoadModel.setAggLoadRequest(
         internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, 
"false").toBoolean)
       
carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
@@ -267,10 +266,12 @@ case class CarbonLoadDataCommand(
           carbonLoadModel.setUseOnePass(false)
         }
         // Create table and metadata folders if not exist
-        val metadataDirectoryPath = 
CarbonTablePath.getMetadataPath(table.getTablePath)
-        val fileType = FileFactory.getFileType(metadataDirectoryPath)
-        if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
-          FileFactory.mkdirs(metadataDirectoryPath, fileType)
+        if (carbonLoadModel.isCarbonTransactionalTable) {
+          val metadataDirectoryPath = 
CarbonTablePath.getMetadataPath(table.getTablePath)
+          val fileType = FileFactory.getFileType(metadataDirectoryPath)
+          if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
+            FileFactory.mkdirs(metadataDirectoryPath, fileType)
+          }
         }
         val partitionStatus = SegmentStatus.SUCCESS
         val columnar = sparkSession.conf.get("carbon.is.columnar.storage", 
"true").toBoolean

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index cd5d8f9..0fb05e0 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -55,8 +55,8 @@ case class CarbonShowLoadsCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
-    if (carbonTable.getTableInfo.isUnManagedTable) {
-      throw new MalformedCarbonCommandException("Unsupported operation on 
unmanaged table")
+    if (!carbonTable.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
     }
     CarbonStore.showSegments(
       limit,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 1b087bd..225237b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -44,8 +44,8 @@ private[sql] case class CarbonProjectForDeleteCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
-    if (carbonTable.getTableInfo.isUnManagedTable) {
-      throw new MalformedCarbonCommandException("Unsupported operation on 
unmanaged table")
+    if (!carbonTable.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
     }
 
     if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 24ac80c..d8379a7 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -57,8 +57,8 @@ private[sql] case class CarbonProjectForUpdateCommand(
       return Seq.empty
     }
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
-    if (carbonTable.getTableInfo.isUnManagedTable) {
-      throw new MalformedCarbonCommandException("Unsupported operation on 
unmanaged table")
+    if (!carbonTable.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
     }
     if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "loading", "data 
update")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index 59ec71c..807c925 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -123,6 +123,7 @@ case class CarbonAlterTableAddHivePartitionCommand(
             "Schema of index files located in location is not matching with 
current table schema")
         }
         val loadModel = new CarbonLoadModel
+        loadModel.setCarbonTransactionalTable(true)
         loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table))
         // Create new entry in tablestatus file
         CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, false)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index 756bc97..25c0559 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -138,6 +138,7 @@ case class CarbonAlterTableDropPartitionCommand(
       // Need to fill dimension relation
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
       carbonLoadModel.setTableName(table.getTableName)
+      carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable)
       carbonLoadModel.setDatabaseName(table.getDatabaseName)
       carbonLoadModel.setTablePath(table.getTablePath)
       val loadStartTime = CarbonUpdateUtil.readCurrentTime

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 929de0a..f4b6de0 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -149,6 +149,7 @@ case class CarbonAlterTableSplitPartitionCommand(
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
       carbonLoadModel.setTableName(table.getTableName)
       carbonLoadModel.setDatabaseName(table.getDatabaseName)
+      carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable)
       carbonLoadModel.setTablePath(tablePath)
       val loadStartTime = CarbonUpdateUtil.readCurrentTime
       carbonLoadModel.setFactTimeStamp(loadStartTime)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index af52d6b..dfcd12b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -77,8 +77,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
     var oldCarbonTable: CarbonTable = null
     oldCarbonTable = metastore.lookupRelation(Some(oldDatabaseName), 
oldTableName)(sparkSession)
       .asInstanceOf[CarbonRelation].carbonTable
-    if (oldCarbonTable.getTableInfo.isUnManagedTable) {
-      throw new MalformedCarbonCommandException("Unsupported operation on 
unmanaged table")
+    if (!oldCarbonTable.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
     }
 
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 6266c53..16e99b5 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -90,7 +90,7 @@ case class CarbonCreateTableCommand(
       OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, 
operationContext)
       val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
       val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, 
tableIdentifier)
-      val isUnmanaged = tableInfo.isUnManagedTable
+      val isTransactionalTable = tableInfo.isTransactionalTable
       if (createDSTable) {
         try {
           val tablePath = tableIdentifier.getTablePath
@@ -133,7 +133,7 @@ case class CarbonCreateTableCommand(
                  |  tablePath "$tablePath",
                  |  path "$tablePath",
                  |  isExternal "$isExternal",
-                 |  isUnManaged "$isUnmanaged",
+                 |  isTransactional "$isTransactionalTable",
                  |  isVisible "$isVisible"
                  |  $carbonSchemaString)
                  |  $partitionString

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 53e1ed4..61df9b1 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -50,15 +50,22 @@ case class CarbonDropTableCommand(
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, 
LockUsage.DROP_TABLE_LOCK)
+
+
     val identifier = CarbonEnv.getIdentifier(databaseNameOp, 
tableName)(sparkSession)
     val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
     val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = 
ListBuffer()
     try {
+      carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
+      val locksToBeAcquired: List[String] = if 
(carbonTable.isTransactionalTable) {
+        List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
+      } else {
+        List.empty
+      }
       locksToBeAcquired foreach {
         lock => carbonLocks += CarbonLockUtil.getLockObject(identifier, lock)
       }
-      carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
+
       if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
         throw new ConcurrentOperationException(carbonTable, "loading", "drop 
table")
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index c58d02d..26d5330 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -115,8 +115,9 @@ class DDLStrategy(sparkSession: SparkSession) extends 
SparkStrategy {
           if (carbonTable != null && carbonTable.isFileLevelFormat) {
             throw new MalformedCarbonCommandException(
               "Unsupported alter operation on Carbon external fileformat 
table")
-          } else if (carbonTable != null && 
carbonTable.getTableInfo.isUnManagedTable) {
-            throw new MalformedCarbonCommandException("Unsupported operation 
on unmanaged table")
+          } else if (carbonTable != null && 
!carbonTable.getTableInfo.isTransactionalTable) {
+            throw new MalformedCarbonCommandException(
+              "Unsupported operation on non transactional table")
           } else {
             ExecutedCommandExec(dataTypeChange) :: Nil
           }
@@ -133,8 +134,9 @@ class DDLStrategy(sparkSession: SparkSession) extends 
SparkStrategy {
           if (carbonTable != null && carbonTable.isFileLevelFormat) {
             throw new MalformedCarbonCommandException(
               "Unsupported alter operation on Carbon external fileformat 
table")
-          } else if (carbonTable != null && 
carbonTable.getTableInfo.isUnManagedTable) {
-            throw new MalformedCarbonCommandException("Unsupported operation 
on unmanaged table")
+          } else if (carbonTable != null && 
!carbonTable.getTableInfo.isTransactionalTable) {
+            throw new MalformedCarbonCommandException(
+              "Unsupported operation on non transactional table")
           } else {
             ExecutedCommandExec(addColumn) :: Nil
           }
@@ -151,8 +153,9 @@ class DDLStrategy(sparkSession: SparkSession) extends 
SparkStrategy {
           if (carbonTable != null && carbonTable.isFileLevelFormat) {
             throw new MalformedCarbonCommandException(
               "Unsupported alter operation on Carbon external fileformat 
table")
-          } else if (carbonTable != null && 
carbonTable.getTableInfo.isUnManagedTable) {
-            throw new MalformedCarbonCommandException("Unsupported operation 
on unmanaged table")
+          } else if (carbonTable != null && 
!carbonTable.getTableInfo.isTransactionalTable) {
+            throw new MalformedCarbonCommandException(
+              "Unsupported operation on non transactional table")
           } else {
             ExecutedCommandExec(dropColumn) :: Nil
           }
@@ -185,8 +188,9 @@ class DDLStrategy(sparkSession: SparkSession) extends 
SparkStrategy {
         if (isCarbonTable) {
           val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
             
.lookupRelation(t)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
-          if (carbonTable != null && 
carbonTable.getTableInfo.isUnManagedTable) {
-            throw new MalformedCarbonCommandException("Unsupported operation 
on unmanaged table")
+          if (carbonTable != null && 
!carbonTable.getTableInfo.isTransactionalTable) {
+            throw new MalformedCarbonCommandException(
+              "Unsupported operation on non transactional table")
           }
           if (!carbonTable.isHivePartitionTable) {
             ExecutedCommandExec(CarbonShowCarbonPartitionsCommand(t)) :: Nil
@@ -238,8 +242,9 @@ class DDLStrategy(sparkSession: SparkSession) extends 
SparkStrategy {
         // if the table has 'preaggregate' DataMap, it doesn't support 
streaming now
         val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
           
.lookupRelation(tableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
-        if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) {
-          throw new MalformedCarbonCommandException("Unsupported operation on 
unmanaged table")
+        if (carbonTable != null && 
!carbonTable.getTableInfo.isTransactionalTable) {
+          throw new MalformedCarbonCommandException(
+            "Unsupported operation on non transactional table")
         }
 
         // TODO remove this limitation later

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 36b6d96..c61471a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -104,7 +104,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
         CarbonRelation(database, tableName, 
CarbonSparkUtil.createSparkMeta(t), t)
       case None =>
         readCarbonSchema(absIdentifier,
-          parameters.getOrElse("isUnManaged", "false").toBoolean) match {
+          !parameters.getOrElse("isTransactional", "true").toBoolean) match {
           case Some(meta) =>
             CarbonRelation(database, tableName,
               CarbonSparkUtil.createSparkMeta(meta), meta)
@@ -230,7 +230,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
         schemaConverter
           .fromExternalToWrapperTableInfo(thriftTableInfo, dbName, tableName, 
tablePath)
       wrapperTableInfo.getFactTable.getTableProperties.put("_external", "true")
-      wrapperTableInfo.setUnManagedTable(true)
+      wrapperTableInfo.setTransactionalTable(false)
       Some(wrapperTableInfo)
     } else {
       val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
@@ -474,7 +474,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
       sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
       DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
     } else {
-      if (isUnmanagedCarbonTable(absoluteTableIdentifier)) {
+      if (!isTransactionalCarbonTable(absoluteTableIdentifier)) {
         removeTableFromMetadata(dbName, tableName)
         CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, 
sparkSession)
         // discard cached table info in cachedDataSourceTables
@@ -486,9 +486,9 @@ class CarbonFileMetastore extends CarbonMetaStore {
   }
 
 
-  def isUnmanagedCarbonTable(identifier: AbsoluteTableIdentifier): Boolean = {
-    val table = getTableFromMetadataCache(identifier.getDatabaseName, 
identifier.getTableName);
-    table.map(_.getTableInfo.isUnManagedTable).getOrElse(false)
+  def isTransactionalCarbonTable(identifier: AbsoluteTableIdentifier): Boolean 
= {
+    val table = getTableFromMetadataCache(identifier.getDatabaseName, 
identifier.getTableName)
+    table.map(_.getTableInfo.isTransactionalTable).getOrElse(true)
   }
 
   private def getTimestampFileAndType() = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 33eac61..aacbdd0 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -258,7 +258,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
     }
     // validate tblProperties
     val bucketFields = parser.getBucketFields(tableProperties, fields, options)
-    var unManagedTable : Boolean = false
+    var isTransactionalTable : Boolean = true
 
     val tableInfo = if (external) {
       // read table info from schema file in the provided table path
@@ -272,7 +272,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
           if (provider.equalsIgnoreCase("'carbonfile'")) {
             SchemaReader.inferSchema(identifier, true)
           } else {
-            unManagedTable = true
+            isTransactionalTable = false
             SchemaReader.inferSchema(identifier, false)
           }
         }
@@ -307,7 +307,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
         tableComment)
       TableNewProcessor(tableModel)
     }
-    tableInfo.setUnManagedTable(unManagedTable)
+    tableInfo.setTransactionalTable(isTransactionalTable)
     selectQuery match {
       case query@Some(q) =>
         CarbonCreateTableAsSelectCommand(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 43d8c03..d98229a 100644
--- 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -158,6 +158,7 @@ class ExternalColumnDictionaryTestCase extends 
Spark2QueryTest with BeforeAndAft
     val carbonSchema = new CarbonDataLoadSchema(table)
     carbonLoadModel.setDatabaseName(table.getDatabaseName)
     carbonLoadModel.setTableName(table.getTableName)
+    carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable)
     carbonLoadModel.setTablePath(relation.carbonTable.getTablePath)
     carbonLoadModel.setCarbonDataLoadSchema(carbonSchema)
     carbonLoadModel.setFactFilePath(filePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 829c17e..ad1c84c 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -110,7 +110,7 @@ public class CarbonDataLoadConfiguration {
 
   private SortColumnRangeInfo sortColumnRangeInfo;
 
-  private boolean carbonUnmanagedTable;
+  private boolean carbonTransactionalTable;
 
   /**
    * Flder path to where data should be written for this load.
@@ -380,11 +380,11 @@ public class CarbonDataLoadConfiguration {
     this.sortColumnRangeInfo = sortColumnRangeInfo;
   }
 
-  public boolean isCarbonUnmanagedTable() {
-    return carbonUnmanagedTable;
+  public boolean isCarbonTransactionalTable() {
+    return carbonTransactionalTable;
   }
 
-  public void setCarbonUnmanagedTable(boolean carbonUnmanagedTable) {
-    this.carbonUnmanagedTable = carbonUnmanagedTable;
+  public void setCarbonTransactionalTable(boolean carbonTransactionalTable) {
+    this.carbonTransactionalTable = carbonTransactionalTable;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 6d3f596..9c1d113 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -185,7 +185,7 @@ public final class DataLoadProcessBuilder {
     CarbonTable carbonTable = 
loadModel.getCarbonDataLoadSchema().getCarbonTable();
     AbsoluteTableIdentifier identifier = 
carbonTable.getAbsoluteTableIdentifier();
     configuration.setTableIdentifier(identifier);
-    configuration.setCarbonUnmanagedTable(loadModel.isCarbonUnmanagedTable());
+    
configuration.setCarbonTransactionalTable(loadModel.isCarbonTransactionalTable());
     
configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
     configuration.setHeader(loadModel.getCsvHeaderColumns());
     configuration.setSegmentId(loadModel.getSegmentId());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index fd39563..2b820d8 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -48,11 +48,11 @@ public class CarbonLoadModel implements Serializable {
   private String tablePath;
 
   /*
-     This points if the carbonTable is a Unmanaged Table or not.
+     This points if the carbonTable is a Non Transactional Table or not.
      The path will be pointed by the tablePath. And there will be
-     no Metadata folder present for the unmanaged Table.
+     no Metadata folder present for the Non Transactional Table.
    */
-  private boolean carbonUnmanagedTable;
+  private boolean carbonTransactionalTable = true;
 
   private String csvHeader;
   private String[] csvHeaderColumns;
@@ -417,7 +417,7 @@ public class CarbonLoadModel implements Serializable {
     copy.defaultTimestampFormat = defaultTimestampFormat;
     copy.maxColumns = maxColumns;
     copy.tablePath = tablePath;
-    copy.carbonUnmanagedTable = carbonUnmanagedTable;
+    copy.carbonTransactionalTable = carbonTransactionalTable;
     copy.useOnePass = useOnePass;
     copy.dictionaryServerHost = dictionaryServerHost;
     copy.dictionaryServerPort = dictionaryServerPort;
@@ -471,7 +471,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.defaultTimestampFormat = defaultTimestampFormat;
     copyObj.maxColumns = maxColumns;
     copyObj.tablePath = tablePath;
-    copyObj.carbonUnmanagedTable = carbonUnmanagedTable;
+    copyObj.carbonTransactionalTable = carbonTransactionalTable;
     copyObj.useOnePass = useOnePass;
     copyObj.dictionaryServerHost = dictionaryServerHost;
     copyObj.dictionaryServerPort = dictionaryServerPort;
@@ -835,11 +835,11 @@ public class CarbonLoadModel implements Serializable {
     setLoadMetadataDetails(Arrays.asList(details));
   }
 
-  public boolean isCarbonUnmanagedTable() {
-    return carbonUnmanagedTable;
+  public boolean isCarbonTransactionalTable() {
+    return carbonTransactionalTable;
   }
 
-  public void setCarbonUnmanagedTable(boolean carbonUnmanagedTable) {
-    this.carbonUnmanagedTable = carbonUnmanagedTable;
+  public void setCarbonTransactionalTable(boolean carbonTransactionalTable) {
+    this.carbonTransactionalTable = carbonTransactionalTable;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 8eb5ed1..3385479 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -57,10 +57,11 @@ public class CarbonLoadModelBuilder {
   /**
    * build CarbonLoadModel for data loading
    * @param options Load options from user input
+   * @param taskNo
    * @return a new CarbonLoadModel instance
    */
-  public CarbonLoadModel build(
-      Map<String, String> options, long UUID) throws 
InvalidLoadOptionException, IOException {
+  public CarbonLoadModel build(Map<String, String> options, long UUID, String 
taskNo)
+      throws InvalidLoadOptionException, IOException {
     Map<String, String> optionsFinal = 
LoadOption.fillOptionWithDefaultValue(options);
 
     if (!options.containsKey("fileheader")) {
@@ -72,8 +73,9 @@ public class CarbonLoadModelBuilder {
       optionsFinal.put("fileheader", Strings.mkString(columns, ","));
     }
     CarbonLoadModel model = new CarbonLoadModel();
-    model.setCarbonUnmanagedTable(table.isUnManagedTable());
+    model.setCarbonTransactionalTable(table.isTransactionalTable());
     model.setFactTimeStamp(UUID);
+    model.setTaskNo(taskNo);
 
     // we have provided 'fileheader', so it hadoopConf can be null
     build(options, optionsFinal, model, null);
@@ -129,6 +131,7 @@ public class CarbonLoadModelBuilder {
     carbonLoadModel.setDatabaseName(table.getDatabaseName());
     carbonLoadModel.setTablePath(table.getTablePath());
     carbonLoadModel.setTableName(table.getTableName());
+    carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable());
     CarbonDataLoadSchema dataLoadSchema = new CarbonDataLoadSchema(table);
     // Need to fill dimension relation
     carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
index 089b8c7..4ff1cce 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -244,7 +244,7 @@ public class LoadOption {
       }
     }
 
-    if (!carbonLoadModel.isCarbonUnmanagedTable() && !CarbonDataProcessorUtil
+    if (carbonLoadModel.isCarbonTransactionalTable() && 
!CarbonDataProcessorUtil
         .isHeaderValid(carbonLoadModel.getTableName(), csvColumns,
             carbonLoadModel.getCarbonDataLoadSchema(), staticPartitionCols)) {
       if (csvFile == null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 0625a66..aaf20c7 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -367,7 +367,7 @@ public class CarbonFactDataHandlerModel {
     }
     AbsoluteTableIdentifier absoluteTableIdentifier = 
configuration.getTableIdentifier();
     String carbonDataDirectoryPath;
-    if (configuration.isCarbonUnmanagedTable()) {
+    if (!configuration.isCarbonTransactionalTable()) {
       carbonDataDirectoryPath = absoluteTableIdentifier.getTablePath();
     } else {
       carbonDataDirectoryPath = CarbonTablePath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 2b4748f..2b295d6 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -159,6 +159,30 @@ public final class CarbonLoaderUtil {
   }
 
   /**
+   * This API deletes the content of the non Transactional Tables when insert 
overwrite is set true.
+   *
+   * @param loadModel
+   * @throws IOException
+   */
+  public static void deleteNonTransactionalTableForInsertOverwrite(final 
CarbonLoadModel loadModel)
+      throws IOException {
+    // We need to delete the content of the Table Path Folder except the
+    // Newly added file.
+    List<String> filesToBeDeleted = new ArrayList<>();
+    CarbonFile carbonFile = 
FileFactory.getCarbonFile(loadModel.getTablePath());
+    CarbonFile[] filteredList = carbonFile.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        return !file.getName().contains(loadModel.getFactTimeStamp() + "");
+      }
+    });
+    for (CarbonFile file : filteredList) {
+      filesToBeDeleted.add(file.getAbsolutePath());
+    }
+
+    deleteFiles(filesToBeDeleted);
+  }
+
+  /**
    * This API will write the load level metadata for the loadmanagement module 
inorder to
    * manage the load and query execution management smoothly.
    *
@@ -169,8 +193,13 @@ public final class CarbonLoaderUtil {
    * @throws IOException
    */
   public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
-      CarbonLoadModel loadModel, boolean loadStartEntry, boolean 
insertOverwrite, String uuid)
+      final CarbonLoadModel loadModel, boolean loadStartEntry, boolean 
insertOverwrite, String uuid)
       throws IOException {
+    // For Non Transactional tables no need to update the the Table Status 
file.
+    if (!loadModel.isCarbonTransactionalTable()) {
+      return true;
+    }
+
     return recordNewLoadMetadata(newMetaEntry, loadModel, loadStartEntry, 
insertOverwrite, uuid,
         new ArrayList<Segment>(), new ArrayList<Segment>());
   }
@@ -191,10 +220,12 @@ public final class CarbonLoaderUtil {
     boolean status = false;
     AbsoluteTableIdentifier identifier =
         
loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-    String metadataPath = 
CarbonTablePath.getMetadataPath(identifier.getTablePath());
-    FileType fileType = FileFactory.getFileType(metadataPath);
-    if (!FileFactory.isFileExist(metadataPath, fileType)) {
-      FileFactory.mkdirs(metadataPath, fileType);
+    if (loadModel.isCarbonTransactionalTable()) {
+      String metadataPath = 
CarbonTablePath.getMetadataPath(identifier.getTablePath());
+      FileType fileType = FileFactory.getFileType(metadataPath);
+      if (!FileFactory.isFileExist(metadataPath, fileType)) {
+        FileFactory.mkdirs(metadataPath, fileType);
+      }
     }
     String tableStatusPath;
     if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() 
&& !uuid.isEmpty()) {
@@ -432,6 +463,10 @@ public final class CarbonLoaderUtil {
     }
     CarbonLoaderUtil
         .populateNewLoadMetaEntry(newLoadMetaEntry, status, 
model.getFactTimeStamp(), false);
+
+    if (!model.isCarbonTransactionalTable() && insertOverwrite) {
+      CarbonLoaderUtil.deleteNonTransactionalTableForInsertOverwrite(model);
+    }
     boolean entryAdded = CarbonLoaderUtil
         .recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite, 
uuid);
     if (!entryAdded) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git 
a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java 
b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index aae6f03..16d4d53 100644
--- 
a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ 
b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -136,6 +136,7 @@ public class StoreCreator {
       
loadModel.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
       
loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName());
       
loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName());
+      loadModel.setCarbonTransactionalTable(true);
       loadModel.setFactFilePath(factFilePath);
       loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
       loadModel.setTablePath(identifier.getTablePath());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 4e09553..de1e5be 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -55,8 +55,9 @@ public class CarbonWriterBuilder {
   private boolean persistSchemaFile;
   private int blockletSize;
   private int blockSize;
-  private boolean isUnManagedTable;
+  private boolean isTransactionalTable;
   private long UUID;
+  private String taskNo;
 
   /**
    * prepares the builder with the schema provided
@@ -91,6 +92,19 @@ public class CarbonWriterBuilder {
   }
 
   /**
+   * sets the taskNo for the writer. SDKs concurrently running
+   * will set taskNo in order to avoid conflits in file write.
+   * @param taskNo is the TaskNo user wants to specify. Mostly it system time.
+   * @return updated CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder taskNo(String taskNo) {
+    this.taskNo = taskNo;
+    return this;
+  }
+
+
+
+  /**
    * If set, create a schema file in metadata folder.
    * @param persist is a boolean value, If set, create a schema file in 
metadata folder
    * @return updated CarbonWriterBuilder
@@ -101,14 +115,14 @@ public class CarbonWriterBuilder {
   }
 
   /**
-   * If set true, writes the carbondata and carbonindex files in a flat folder 
structure
-   * @param isUnManagedTable is a boolelan value if set writes
+   * If set false, writes the carbondata and carbonindex files in a flat 
folder structure
+   * @param isTransactionalTable is a boolelan value if set to false then 
writes
    *                     the carbondata and carbonindex files in a flat folder 
structure
    * @return updated CarbonWriterBuilder
    */
-  public CarbonWriterBuilder unManagedTable(boolean isUnManagedTable) {
-    Objects.requireNonNull(isUnManagedTable, "UnManaged Table should not be 
null");
-    this.isUnManagedTable = isUnManagedTable;
+  public CarbonWriterBuilder isTransactionalTable(boolean 
isTransactionalTable) {
+    Objects.requireNonNull(isTransactionalTable, "Transactional Table should 
not be null");
+    this.isTransactionalTable = isTransactionalTable;
     return this;
   }
 
@@ -180,7 +194,7 @@ public class CarbonWriterBuilder {
     }
 
     // build LoadModel
-    return buildLoadModel(table, UUID);
+    return buildLoadModel(table, UUID, taskNo);
   }
 
   /**
@@ -209,7 +223,7 @@ public class CarbonWriterBuilder {
     }
     String tableName;
     String dbName;
-    if (!isUnManagedTable) {
+    if (isTransactionalTable) {
       tableName = "_tempTable";
       dbName = "_tempDB";
     } else {
@@ -223,7 +237,7 @@ public class CarbonWriterBuilder {
         .databaseName(dbName)
         .tablePath(path)
         .tableSchema(schema)
-        .isUnManagedTable(isUnManagedTable)
+        .isTransactionalTable(isTransactionalTable)
         .build();
     return table;
   }
@@ -261,13 +275,13 @@ public class CarbonWriterBuilder {
   /**
    * Build a {@link CarbonLoadModel}
    */
-  private CarbonLoadModel buildLoadModel(CarbonTable table, long UUID)
+  private CarbonLoadModel buildLoadModel(CarbonTable table, long UUID, String 
taskNo)
       throws InvalidLoadOptionException, IOException {
     Map<String, String> options = new HashMap<>();
     if (sortColumns != null) {
       options.put("sort_columns", Strings.mkString(sortColumns, ","));
     }
     CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table);
-    return builder.build(options, UUID);
+    return builder.build(options, UUID, taskNo);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
index 25c34e0..c30bd3a 100644
--- 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
+++ 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
@@ -66,6 +66,7 @@ public class AvroCarbonWriterTest {
       CarbonWriter writer = CarbonWriter.builder()
           .withSchema(new org.apache.carbondata.sdk.file.Schema(fields))
           .outputPath(path)
+          .isTransactionalTable(true)
           .buildWriterForAvroInput();
 
       for (int i = 0; i < 100; i++) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index eecbf5f..41fde66 100644
--- 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -90,6 +90,7 @@ public class CSVCarbonWriterTest {
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder()
           .withSchema(new Schema(fields))
+          .isTransactionalTable(true)
           .outputPath(path);
 
       CarbonWriter writer = builder.buildWriterForCSVInput();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
new file mode 100644
index 0000000..32fe6e8
--- /dev/null
+++ 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test suite for {@link CSVCarbonWriter}
+ */
+public class CSVNonTransactionalCarbonWriterTest {
+
+  @Test
+  public void testWriteFiles() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(new Schema(fields), path);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testWriteFilesJsonSchema() throws IOException {
+    String path = "./testWriteFilesJsonSchema";
+    FileUtils.deleteDirectory(new File(path));
+
+    String schema = new StringBuilder()
+        .append("[ \n")
+        .append("   {\"name\":\"string\"},\n")
+        .append("   {\"age\":\"int\"},\n")
+        .append("   {\"height\":\"double\"}\n")
+        .append("]")
+        .toString();
+
+    writeFilesAndVerify(Schema.parseJson(schema), path);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  private void writeFilesAndVerify(Schema schema, String path) {
+    writeFilesAndVerify(schema, path, null);
+  }
+
+  private void writeFilesAndVerify(Schema schema, String path, String[] 
sortColumns) {
+    writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1);
+  }
+
+  private void writeFilesAndVerify(Schema schema, String path, boolean 
persistSchema) {
+    writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1);
+  }
+
+  /**
+   * Invoke CarbonWriter API to write carbon files and assert the file is 
rewritten
+   * @param rows number of rows to write
+   * @param schema schema of the file
+   * @param path local write path
+   * @param sortColumns sort columns
+   * @param persistSchema true if want to persist schema file
+   * @param blockletSize blockletSize in the file, -1 for default size
+   * @param blockSize blockSize in the file, -1 for default size
+   */
+  private void writeFilesAndVerify(int rows, Schema schema, String path, 
String[] sortColumns,
+      boolean persistSchema, int blockletSize, int blockSize) {
+    try {
+      CarbonWriterBuilder builder = CarbonWriter.builder()
+          .withSchema(schema)
+          .isTransactionalTable(false)
+          .uniqueIdentifier(System.currentTimeMillis())
+          .taskNo(Long.toString(System.nanoTime()))
+          .outputPath(path);
+      if (sortColumns != null) {
+        builder = builder.sortBy(sortColumns);
+      }
+      if (persistSchema) {
+        builder = builder.persistSchemaFile(true);
+      }
+      if (blockletSize != -1) {
+        builder = builder.withBlockletSize(blockletSize);
+      }
+      if (blockSize != -1) {
+        builder = builder.withBlockSize(blockSize);
+      }
+
+      CarbonWriter writer = builder.buildWriterForCSVInput();
+
+      for (int i = 0; i < rows; i++) {
+        writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), 
String.valueOf((double) i / 2)});
+      }
+      writer.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    } catch (InvalidLoadOptionException l) {
+      l.printStackTrace();
+      Assert.fail(l.getMessage());
+    }
+
+    File segmentFolder = new File(path);
+    Assert.assertTrue(segmentFolder.exists());
+
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertTrue(dataFiles.length > 0);
+  }
+  
+
+  @Test
+  public void testAllPrimitiveDataType() throws IOException {
+    // TODO: write all data type and read by CarbonRecordReader to verify the 
content
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[9];
+    fields[0] = new Field("stringField", DataTypes.STRING);
+    fields[1] = new Field("intField", DataTypes.INT);
+    fields[2] = new Field("shortField", DataTypes.SHORT);
+    fields[3] = new Field("longField", DataTypes.LONG);
+    fields[4] = new Field("doubleField", DataTypes.DOUBLE);
+    fields[5] = new Field("boolField", DataTypes.BOOLEAN);
+    fields[6] = new Field("dateField", DataTypes.DATE);
+    fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
+    fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
+
+    try {
+      CarbonWriterBuilder builder = CarbonWriter.builder()
+          .withSchema(new Schema(fields))
+          .uniqueIdentifier(System.currentTimeMillis())
+          .isTransactionalTable(false)
+          .taskNo(Long.toString(System.nanoTime()))
+          .outputPath(path);
+
+      CarbonWriter writer = builder.buildWriterForCSVInput();
+
+      for (int i = 0; i < 100; i++) {
+        String[] row = new String[]{
+            "robot" + (i % 10),
+            String.valueOf(i),
+            String.valueOf(i),
+            String.valueOf(Long.MAX_VALUE - i),
+            String.valueOf((double) i / 2),
+            String.valueOf(true),
+            "2019-03-02",
+            "2019-02-12 03:03:34"
+        };
+        writer.write(row);
+      }
+      writer.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+
+    File segmentFolder = new File(path);
+    Assert.assertTrue(segmentFolder.exists());
+
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertTrue(dataFiles.length > 0);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void test2Blocklet() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 
100);
+
+    // TODO: implement reader to verify the number of blocklet in the file
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void test2Block() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 
2);
+
+    File segmentFolder = new File(path);
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertEquals(2, dataFiles.length);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testSortColumns() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(new Schema(fields), path, new String[]{"name"});
+
+    // TODO: implement reader and verify the data is sorted
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testPartitionOutput() {
+    // TODO: test write data with partition
+  }
+
+  @Test
+  public void testSchemaPersistence() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(new Schema(fields), path, true);
+
+    String schemaFile = CarbonTablePath.getSchemaFilePath(path);
+    Assert.assertTrue(new File(schemaFile).exists());
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java
 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java
deleted file mode 100644
index 4bcdfff..0000000
--- 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.sdk.file;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FilenameFilter;
-import java.io.IOException;
-
-import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test suite for {@link CSVCarbonWriter}
- */
-public class CSVUnManagedCarbonWriterTest {
-
-  @Test
-  public void testWriteFiles() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(new Schema(fields), path);
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void testWriteFilesJsonSchema() throws IOException {
-    String path = "./testWriteFilesJsonSchema";
-    FileUtils.deleteDirectory(new File(path));
-
-    String schema = new StringBuilder()
-        .append("[ \n")
-        .append("   {\"name\":\"string\"},\n")
-        .append("   {\"age\":\"int\"},\n")
-        .append("   {\"height\":\"double\"}\n")
-        .append("]")
-        .toString();
-
-    writeFilesAndVerify(Schema.parseJson(schema), path);
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  private void writeFilesAndVerify(Schema schema, String path) {
-    writeFilesAndVerify(schema, path, null);
-  }
-
-  private void writeFilesAndVerify(Schema schema, String path, String[] 
sortColumns) {
-    writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1);
-  }
-
-  private void writeFilesAndVerify(Schema schema, String path, boolean 
persistSchema) {
-    writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1);
-  }
-
-  /**
-   * Invoke CarbonWriter API to write carbon files and assert the file is 
rewritten
-   * @param rows number of rows to write
-   * @param schema schema of the file
-   * @param path local write path
-   * @param sortColumns sort columns
-   * @param persistSchema true if want to persist schema file
-   * @param blockletSize blockletSize in the file, -1 for default size
-   * @param blockSize blockSize in the file, -1 for default size
-   */
-  private void writeFilesAndVerify(int rows, Schema schema, String path, 
String[] sortColumns,
-      boolean persistSchema, int blockletSize, int blockSize) {
-    try {
-      CarbonWriterBuilder builder = CarbonWriter.builder()
-          .withSchema(schema)
-          .unManagedTable(true)
-          .uniqueIdentifier(System.currentTimeMillis())
-          .outputPath(path);
-      if (sortColumns != null) {
-        builder = builder.sortBy(sortColumns);
-      }
-      if (persistSchema) {
-        builder = builder.persistSchemaFile(true);
-      }
-      if (blockletSize != -1) {
-        builder = builder.withBlockletSize(blockletSize);
-      }
-      if (blockSize != -1) {
-        builder = builder.withBlockSize(blockSize);
-      }
-
-      CarbonWriter writer = builder.buildWriterForCSVInput();
-
-      for (int i = 0; i < rows; i++) {
-        writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), 
String.valueOf((double) i / 2)});
-      }
-      writer.close();
-    } catch (IOException e) {
-      e.printStackTrace();
-      Assert.fail(e.getMessage());
-    } catch (InvalidLoadOptionException l) {
-      l.printStackTrace();
-      Assert.fail(l.getMessage());
-    }
-
-    File segmentFolder = new File(path);
-    Assert.assertTrue(segmentFolder.exists());
-
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
-      @Override public boolean accept(File pathname) {
-        return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
-      }
-    });
-    Assert.assertNotNull(dataFiles);
-    Assert.assertTrue(dataFiles.length > 0);
-  }
-  
-
-  @Test
-  public void testAllPrimitiveDataType() throws IOException {
-    // TODO: write all data type and read by CarbonRecordReader to verify the 
content
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[9];
-    fields[0] = new Field("stringField", DataTypes.STRING);
-    fields[1] = new Field("intField", DataTypes.INT);
-    fields[2] = new Field("shortField", DataTypes.SHORT);
-    fields[3] = new Field("longField", DataTypes.LONG);
-    fields[4] = new Field("doubleField", DataTypes.DOUBLE);
-    fields[5] = new Field("boolField", DataTypes.BOOLEAN);
-    fields[6] = new Field("dateField", DataTypes.DATE);
-    fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
-    fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
-
-    try {
-      CarbonWriterBuilder builder = CarbonWriter.builder()
-          .withSchema(new Schema(fields))
-          .uniqueIdentifier(System.currentTimeMillis())
-          .unManagedTable(true)
-          .outputPath(path);
-
-      CarbonWriter writer = builder.buildWriterForCSVInput();
-
-      for (int i = 0; i < 100; i++) {
-        String[] row = new String[]{
-            "robot" + (i % 10),
-            String.valueOf(i),
-            String.valueOf(i),
-            String.valueOf(Long.MAX_VALUE - i),
-            String.valueOf((double) i / 2),
-            String.valueOf(true),
-            "2019-03-02",
-            "2019-02-12 03:03:34"
-        };
-        writer.write(row);
-      }
-      writer.close();
-    } catch (Exception e) {
-      e.printStackTrace();
-      Assert.fail(e.getMessage());
-    }
-
-    File segmentFolder = new File(path);
-    Assert.assertTrue(segmentFolder.exists());
-
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
-      @Override public boolean accept(File pathname) {
-        return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
-      }
-    });
-    Assert.assertNotNull(dataFiles);
-    Assert.assertTrue(dataFiles.length > 0);
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void test2Blocklet() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 
100);
-
-    // TODO: implement reader to verify the number of blocklet in the file
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void test2Block() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 
2);
-
-    File segmentFolder = new File(path);
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
-      @Override public boolean accept(File pathname) {
-        return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
-      }
-    });
-    Assert.assertNotNull(dataFiles);
-    Assert.assertEquals(2, dataFiles.length);
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void testSortColumns() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(new Schema(fields), path, new String[]{"name"});
-
-    // TODO: implement reader and verify the data is sorted
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void testPartitionOutput() {
-    // TODO: test write data with partition
-  }
-
-  @Test
-  public void testSchemaPersistence() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(new Schema(fields), path, true);
-
-    String schemaFile = CarbonTablePath.getSchemaFilePath(path);
-    Assert.assertTrue(new File(schemaFile).exists());
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
index 068164d..03aecb8 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
@@ -56,6 +56,7 @@ public class TestUtil {
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder()
           .withSchema(schema)
+          .isTransactionalTable(true)
           .outputPath(path);
       if (sortColumns != null) {
         builder = builder.sortBy(sortColumns);

Reply via email to