 File path: 
 @@ -17,85 +17,517 @@
 package org.apache.spark.sql.execution.command.management
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
-import org.apache.spark.storage.StorageLevel
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, 
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, 
CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+* insert into without df, by just using logical plan
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+  var logicalPartitionRelation: LogicalRelation = _
+  var sizeInBytes: Long = _
+  var currPartitions: util.List[PartitionSpec] = _
+  var parentTablePath: String = _
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+  var scanResultRdd: RDD[InternalRow] = _
+  var timeStampFormat: SimpleDateFormat = _
+  var dateFormat: SimpleDateFormat = _
+  var finalPartition: Map[String, Option[String]] = Map.empty
+  var isInsertIntoWithConverterFlow: Boolean = false
+  var dataFrame: DataFrame = _
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, 
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical 
relation exist")
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot 
be applied.
+    // So, sending it to old flow
+    // TODO: Handle alter table in future, this also must use new flow.
+    if (CarbonProperties.isBadRecordHandlingEnabledForInsert ||
+        isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), 
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || 
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) 
= CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isInsertIntoWithConverterFlow) {
+      return CarbonInsertIntoWithDf(
+        databaseNameOp = databaseNameOp,
+        tableName = tableName,
+        options = options,
+        isOverwriteTable = isOverwriteTable,
+        dimFilesPath = dimFilesPath,
+        dataFrame = dataFrame,
+        inputSqlString = inputSqlString,
+        updateModel = None,
+        tableInfoOp = tableInfoOp,
+        internalOptions = internalOptions,
+        partition = partition).process(sparkSession)
+    }
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", 
+    val factPath = ""
+    currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession)
+    val optionsFinal: util.Map[String, String] = 
CommonLoadUtils.getFinalLoadOptions(table, options)
+    val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+      hadoopConf = hadoopConf,
+      factPath = factPath,
+      optionsFinal = optionsFinal,
+      parentTablePath = parentTablePath,
+      table = table,
+      isDataFrame = true,
+      internalOptions = internalOptions,
+      partition = partition,
+      options = options)
+    val (tf, df) = 
+    timeStampFormat = tf
+    dateFormat = df
+    val partitionInfo = tableInfoOp.get.getFactTable.getPartitionInfo
+    val partitionColumnSchema =
+      if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 
0) {
+        partitionInfo.getColumnSchemaList.asScala
+      } else {
+        null
+      }
+    val convertedStaticPartition = 
+    val (reArrangedIndex, selectedColumnSchema) = 
+      tableInfoOp.get,
+      partitionColumnSchema)
+    val newLogicalPlan = getReArrangedLogicalPlan(
+      reArrangedIndex,
+      selectedColumnSchema,
+      convertedStaticPartition)
+    scanResultRdd = sparkSession.sessionState.executePlan(newLogicalPlan).toRdd
+    if (logicalPartitionRelation != null) {
+      logicalPartitionRelation =
+        getReArrangedSchemaLogicalRelation(reArrangedIndex, 
+    }
+    // Delete stale segment folders that are not in table status but are 
physically present in
+    // the Fact folder
+    LOGGER.info(s"Deleting stale folders if present for table 
+    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
+    var isUpdateTableStatusRequired = false
+    val uuid = ""
+    try {
+      val (tableDataMaps, dataMapOperationContext) =
+        CommonLoadUtils.firePreLoadEvents(sparkSession,
+          carbonLoadModel,
+          uuid,
+          table,
+          isOverwriteTable,
+          operationContext)
+      // Clean up the old invalid segment data before creating a new entry for 
new load.
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, 
+      // add the start entry for the new load in the table status file
+      if (!table.isHivePartitionTable) {
+        CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
+          carbonLoadModel,
+          isOverwriteTable)
+        isUpdateTableStatusRequired = true
+      }
+      if (isOverwriteTable) {
+        LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in 
+      }
+      // Create table and metadata folders if not exist
+      if (carbonLoadModel.isCarbonTransactionalTable) {
+        val metadataDirectoryPath = 
+        if (!FileFactory.isFileExist(metadataDirectoryPath)) {
+          FileFactory.mkdirs(metadataDirectoryPath)
+        }
       } else {
-        Dataset.ofRows(sparkSession, child)
+        carbonLoadModel.setSegmentId(System.nanoTime().toString)
-    val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
-    loadCommand = CarbonLoadDataCommand(
-      databaseNameOp = Some(relation.carbonRelation.databaseName),
-      tableName = relation.carbonRelation.tableName,
-      factPathFromUser = null,
-      dimFilesPath = Seq(),
-      options = scala.collection.immutable.Map("fileheader" -> header),
-      isOverwriteTable = overwrite,
-      inputSqlString = null,
-      dataFrame = Some(df),
-      updateModel = None,
-      tableInfoOp = None,
-      internalOptions = Map.empty,
-      partition = partition)
-    val load = loadCommand.processMetadata(sparkSession)
-    if (isPersistRequired) {
-      df.unpersist()
-    }
-    load
+      val partitionStatus = SegmentStatus.SUCCESS
+      val loadParams = CarbonLoadParams(sparkSession,
+        tableName,
+        sizeInBytes,
+        isOverwriteTable,
+        carbonLoadModel,
+        hadoopConf,
+        logicalPartitionRelation,
+        dateFormat,
+        timeStampFormat,
+        options,
+        finalPartition,
+        currPartitions,
+        partitionStatus,
+        None,
+        Some(scanResultRdd),
+        None,
+        operationContext)
+      LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
+      val (rows, loadResult) = insertData(loadParams)
+      val info = CommonLoadUtils.makeAuditInfo(loadResult)
+      setAuditInfo(info)
+      CommonLoadUtils.firePostLoadEvents(sparkSession,
+        carbonLoadModel,
+        tableDataMaps,
+        dataMapOperationContext,
+        table,
+        operationContext)
+    } catch {
+      case CausedBy(ex: NoRetryException) =>
+        // update the load entry in table status file for changing the status 
to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
+        throw new RuntimeException(s"Dataload failure for $dbName.$tableName, 
+      // In case of event related exception
+      case preEventEx: PreEventException =>
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
+        throw new AnalysisException(preEventEx.getMessage)
+      case ex: Exception =>
+        LOGGER.error(ex)
+        // update the load entry in table status file for changing the status 
to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        throw ex
+    }
+    Seq.empty
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    if (null != loadCommand) {
-      val rows = loadCommand.processData(sparkSession)
-      setAuditInfo(loadCommand.auditInfo)
-      rows
+  def getReArrangedLogicalPlan(
+      reArrangedIndex: Seq[Int],
+      selectedColumnSchema: Seq[ColumnSchema],
+      convertedStaticPartition: mutable.Map[String, AnyRef]): LogicalPlan = {
+    var processedProject: Boolean = false
+    // check first node is the projection or not
+    logicalPlan match {
+      case _: Project =>
+      // project is already present as first node
+      case _ =>
+        // If project is not present, add the projection to re-arrange it
+        logicalPlan = Project(logicalPlan.output, logicalPlan)
+    }
+    // Re-arrange the project as per columnSchema
+    val newLogicalPlan = logicalPlan.transformDown {
+      //      case logicalRelation: LogicalRelation =>
+      //        getReArrangedSchemaLogicalRelation(reArrangedIndex, 
+      //      case hiveRelation: HiveTableRelation =>
+      //        getReArrangedSchemaHiveRelation(reArrangedIndex, hiveRelation)
+      case p: Project =>
