ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378800852
 
 

 ##########
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,517 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, 
CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, 
relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical 
relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot 
be applied.
+    // So, sending it to old flow
+    // TODO: Handle alter table in future, this also must use new flow.
+    if (CarbonProperties.isBadRecordHandlingEnabledForInsert ||
+        isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), 
tableName)
     ThreadLocalSessionInfo
       
.setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || 
containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) 
= CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isInsertIntoWithConverterFlow) {
+      return CarbonInsertIntoWithDf(
+        databaseNameOp = databaseNameOp,
+        tableName = tableName,
+        options = options,
+        isOverwriteTable = isOverwriteTable,
+        dimFilesPath = dimFilesPath,
+        dataFrame = dataFrame,
+        inputSqlString = inputSqlString,
+        updateModel = None,
+        tableInfoOp = tableInfoOp,
+        internalOptions = internalOptions,
+        partition = partition).process(sparkSession)
+    }
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", 
"false")
+    val factPath = ""
+    currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession)
+    val optionsFinal: util.Map[String, String] = 
CommonLoadUtils.getFinalLoadOptions(table, options)
+    val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+      hadoopConf = hadoopConf,
+      factPath = factPath,
+      optionsFinal = optionsFinal,
+      parentTablePath = parentTablePath,
+      table = table,
+      isDataFrame = true,
+      internalOptions = internalOptions,
+      partition = partition,
+      options = options)
+    val (tf, df) = 
CommonLoadUtils.getTimeAndDateFormatFromLoadModel(carbonLoadModel)
+    timeStampFormat = tf
+    dateFormat = df
+    val partitionInfo = tableInfoOp.get.getFactTable.getPartitionInfo
+    val partitionColumnSchema =
+      if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 
0) {
+        partitionInfo.getColumnSchemaList.asScala
+      } else {
+        null
+      }
+    val convertedStaticPartition = 
getConvertedStaticPartitionMap(partitionColumnSchema)
+    val (reArrangedIndex, selectedColumnSchema) = 
getReArrangedIndexAndSelectedSchema(
+      tableInfoOp.get,
+      partitionColumnSchema)
+    val newLogicalPlan = getReArrangedLogicalPlan(
+      reArrangedIndex,
+      selectedColumnSchema,
+      convertedStaticPartition)
+    scanResultRdd = sparkSession.sessionState.executePlan(newLogicalPlan).toRdd
+    if (logicalPartitionRelation != null) {
+      logicalPartitionRelation =
+        getReArrangedSchemaLogicalRelation(reArrangedIndex, 
logicalPartitionRelation)
+    }
+    // Delete stale segment folders that are not in table status but are 
physically present in
+    // the Fact folder
+    LOGGER.info(s"Deleting stale folders if present for table 
$dbName.$tableName")
+    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
+    var isUpdateTableStatusRequired = false
+    val uuid = ""
+    try {
+      val (tableDataMaps, dataMapOperationContext) =
+        CommonLoadUtils.firePreLoadEvents(sparkSession,
+          carbonLoadModel,
+          uuid,
+          table,
+          isOverwriteTable,
+          operationContext)
+      // Clean up the old invalid segment data before creating a new entry for 
new load.
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, 
currPartitions)
+      // add the start entry for the new load in the table status file
+      if (!table.isHivePartitionTable) {
+        CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
+          carbonLoadModel,
+          isOverwriteTable)
+        isUpdateTableStatusRequired = true
+      }
+      if (isOverwriteTable) {
+        LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in 
progress")
+      }
+      // Create table and metadata folders if not exist
+      if (carbonLoadModel.isCarbonTransactionalTable) {
+        val metadataDirectoryPath = 
CarbonTablePath.getMetadataPath(table.getTablePath)
+        if (!FileFactory.isFileExist(metadataDirectoryPath)) {
+          FileFactory.mkdirs(metadataDirectoryPath)
+        }
       } else {
-        Dataset.ofRows(sparkSession, child)
+        carbonLoadModel.setSegmentId(System.nanoTime().toString)
       }
-    val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
-    loadCommand = CarbonLoadDataCommand(
-      databaseNameOp = Some(relation.carbonRelation.databaseName),
-      tableName = relation.carbonRelation.tableName,
-      factPathFromUser = null,
-      dimFilesPath = Seq(),
-      options = scala.collection.immutable.Map("fileheader" -> header),
-      isOverwriteTable = overwrite,
-      inputSqlString = null,
-      dataFrame = Some(df),
-      updateModel = None,
-      tableInfoOp = None,
-      internalOptions = Map.empty,
-      partition = partition)
-    val load = loadCommand.processMetadata(sparkSession)
-    if (isPersistRequired) {
-      df.unpersist()
-    }
-    load
+      val partitionStatus = SegmentStatus.SUCCESS
+      val loadParams = CarbonLoadParams(sparkSession,
+        tableName,
+        sizeInBytes,
+        isOverwriteTable,
+        carbonLoadModel,
+        hadoopConf,
+        logicalPartitionRelation,
+        dateFormat,
+        timeStampFormat,
+        options,
+        finalPartition,
+        currPartitions,
+        partitionStatus,
+        None,
+        Some(scanResultRdd),
+        None,
+        operationContext)
+      LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
+      val (rows, loadResult) = insertData(loadParams)
+      val info = CommonLoadUtils.makeAuditInfo(loadResult)
+      setAuditInfo(info)
+      CommonLoadUtils.firePostLoadEvents(sparkSession,
+        carbonLoadModel,
+        tableDataMaps,
+        dataMapOperationContext,
+        table,
+        operationContext)
+    } catch {
+      case CausedBy(ex: NoRetryException) =>
+        // update the load entry in table status file for changing the status 
to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
+        throw new RuntimeException(s"Dataload failure for $dbName.$tableName, 
${ex.getMessage}")
+      // In case of event related exception
+      case preEventEx: PreEventException =>
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
+        throw new AnalysisException(preEventEx.getMessage)
+      case ex: Exception =>
+        LOGGER.error(ex)
+        // update the load entry in table status file for changing the status 
to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        throw ex
+    }
+    Seq.empty
   }
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    if (null != loadCommand) {
-      val rows = loadCommand.processData(sparkSession)
-      setAuditInfo(loadCommand.auditInfo)
-      rows
+
+  def getReArrangedLogicalPlan(
+      reArrangedIndex: Seq[Int],
+      selectedColumnSchema: Seq[ColumnSchema],
+      convertedStaticPartition: mutable.Map[String, AnyRef]): LogicalPlan = {
+    var processedProject: Boolean = false
+    // check first node is the projection or not
+    logicalPlan match {
+      case _: Project =>
+      // project is already present as first node
+      case _ =>
+        // If project is not present, add the projection to re-arrange it
+        logicalPlan = Project(logicalPlan.output, logicalPlan)
+    }
+    // Re-arrange the project as per columnSchema
+    val newLogicalPlan = logicalPlan.transformDown {
+      //      case logicalRelation: LogicalRelation =>
+      //        getReArrangedSchemaLogicalRelation(reArrangedIndex, 
logicalRelation)
+      //      case hiveRelation: HiveTableRelation =>
+      //        getReArrangedSchemaHiveRelation(reArrangedIndex, hiveRelation)
+      case p: Project =>
+        var oldProjectionList = p.projectList
+        if (!processedProject) {
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to