This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b475ed  [CARBONDATA-3720] Support alter table scenario for new insert 
into flow
2b475ed is described below

commit 2b475edc24bbf78085f29ab55198ac7b4a6b24e7
Author: ajantha-bhat <ajanthab...@gmail.com>
AuthorDate: Sun Feb 23 11:04:30 2020 +0530

    [CARBONDATA-3720] Support alter table scenario for new insert into flow
    
    Why is this PR needed?
    Currently, the rearrange logic is based on schema ordinal.
    
    For alter table drop and add columns with/without partition, schema ordinal 
based re arrange may not work as index will be outside the projection size. 
This logic become complex to handle.
    
    Hence don't use schema ordinal for rerrange, implemented a position map 
based rearrange.
    
    What changes were proposed in this PR?
    Implemented a position map based rearrange.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3634
---
 .../management/CarbonInsertIntoCommand.scala       | 90 ++++++++--------------
 1 file changed, 31 insertions(+), 59 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 68189e2..25887e7 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -26,9 +26,9 @@ import scala.collection.mutable
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, CarbonEnv, 
CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal, 
NamedExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataLoadTableFileMapping, UpdateTableModel}
+import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.util.CausedBy
@@ -37,8 +37,8 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, 
ColumnSchema}
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -97,15 +97,9 @@ case class CarbonInsertIntoCommand(databaseNameOp: 
Option[String],
     // If logical plan is unresolved, need to convert it to resolved.
     dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
     logicalPlan = dataFrame.queryExecution.analyzed
-    // Currently projection re-ordering is based on schema ordinal,
-    // for some scenarios in alter table scenario, schema ordinal logic cannot 
be applied.
-    // So, sending it to old flow
-    // TODO: Handle alter table in future, this also must use new flow.
-    if (CarbonProperties.isBadRecordHandlingEnabledForInsert ||
-        isAlteredSchema(tableInfo.getFactTable)) {
+    if (CarbonProperties.isBadRecordHandlingEnabledForInsert) {
+      // use old converter flow
       isInsertIntoWithConverterFlow = true
-    }
-    if (isInsertIntoWithConverterFlow) {
       return Seq.empty
     }
     setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), 
tableName)
@@ -458,67 +452,45 @@ case class CarbonInsertIntoCommand(databaseNameOp: 
Option[String],
     }
   }
 
-  private def isAlteredSchema(tableSchema: TableSchema): Boolean = {
-    if (tableInfo.getFactTable.getSchemaEvolution != null) {
-      tableInfo
-        .getFactTable
-        .getSchemaEvolution
-        .getSchemaEvolutionEntryList.asScala.exists(entry =>
-        (entry.getAdded != null && entry.getAdded.size() > 0) ||
-        (entry.getRemoved != null && entry.getRemoved.size() > 0)
-      )
-    } else {
-      false
-    }
-  }
-
   def getReArrangedIndexAndSelectedSchema(
       tableInfo: TableInfo,
       partitionColumnSchema: mutable.Buffer[ColumnSchema]): (Seq[Int], 
Seq[ColumnSchema]) = {
     var reArrangedIndex: Seq[Int] = Seq()
     var selectedColumnSchema: Seq[ColumnSchema] = Seq()
-    var complexChildCount: Int = 0
     var partitionIndex: Seq[Int] = Seq()
-    val columnSchema = tableInfo.getFactTable.getListOfColumns.asScala
+    // internal order ColumnSchema (non-flat structure)
+    val columnSchema = (table.getVisibleDimensions.asScala ++
+                        
table.getVisibleMeasures.asScala).map(_.getColumnSchema)
     val partitionColumnNames = if (partitionColumnSchema != null) {
       partitionColumnSchema.map(x => x.getColumnName).toSet
     } else {
       null
     }
-    // get invisible column indexes, alter table scenarios can have it before 
or after new column
-    // dummy measure will have ordinal -1 and it is invisible, ignore that 
column.
-    // alter table old columns are just invisible columns with proper ordinal
-    val invisibleIndex = columnSchema.filter(col => col.isInvisible && 
col.getSchemaOrdinal != -1)
-      .map(col => col.getSchemaOrdinal)
-    columnSchema.filterNot(col => col.isInvisible).foreach {
+    var createOrderColumns = table.getCreateOrderColumn.asScala
+    val createOrderMap = mutable.Map[String, Int]()
+    if (partitionColumnNames != null) {
+      // For alter table drop/add column scenarios, partition column may not 
be in the end.
+      // Need to keep it in the end.
+      createOrderColumns = createOrderColumns.filterNot(col =>
+        partitionColumnNames.contains(col.getColumnSchema.getColumnName)) ++
+                           createOrderColumns.filter(col =>
+                             
partitionColumnNames.contains(col.getColumnSchema.getColumnName))
+    }
+    createOrderColumns.zipWithIndex.map {
+      case (col, index) => createOrderMap.put(col.getColName, index)
+    }
+    columnSchema.foreach {
       col =>
         var skipPartitionColumn = false
-        if (col.getColumnName.contains(".")) {
-          // If the schema ordinal is -1,
-          // no need to consider it during shifting columns to derive new 
shifted ordinal
-          if (col.getSchemaOrdinal != -1) {
-            complexChildCount = complexChildCount + 1
-          }
+        if (partitionColumnNames != null &&
+            partitionColumnNames.contains(col.getColumnName)) {
+          partitionIndex = partitionIndex :+ createOrderMap(col.getColumnName)
+          skipPartitionColumn = true
         } else {
-          // get number of invisible index count before this column
-          val invisibleIndexCount = invisibleIndex.count(index => index < 
col.getSchemaOrdinal)
-          if (col.getDataType.isComplexType) {
-            // Calculate re-arrange index by ignoring the complex child count.
-            // As projection will have only parent columns
-            reArrangedIndex = reArrangedIndex :+
-                              (col.getSchemaOrdinal - complexChildCount - 
invisibleIndexCount)
-          } else {
-            if (partitionColumnNames != null &&
-                partitionColumnNames.contains(col.getColumnName)) {
-              partitionIndex = partitionIndex :+ (col.getSchemaOrdinal - 
invisibleIndexCount)
-              skipPartitionColumn = true
-            } else {
-              reArrangedIndex = reArrangedIndex :+ (col.getSchemaOrdinal - 
invisibleIndexCount)
-            }
-          }
-          if (!skipPartitionColumn) {
-            selectedColumnSchema = selectedColumnSchema :+ col
-          }
+          reArrangedIndex = reArrangedIndex :+ 
createOrderMap(col.getColumnName)
+        }
+        if (!skipPartitionColumn) {
+          selectedColumnSchema = selectedColumnSchema :+ col
         }
     }
     if (partitionColumnSchema != null) {

Reply via email to