This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b7facc  [CARBONDATA-3637] Use optimized insert flow for MV and insert 
stage command
6b7facc is described below

commit 6b7faccde15371e316900bf2e3552c653b0c05e3
Author: ajantha-bhat <ajanthab...@gmail.com>
AuthorDate: Sat Feb 15 08:42:42 2020 +0530

    [CARBONDATA-3637] Use optimized insert flow for MV and insert stage command
    
    Why is this PR needed?
    MV and insert stage can use the new optimized insert into flow.
    
    Also In the new insert into flow, found one issue with the partition 
column. Fixed it.
    
    If the catalog table schema is already rearranged, don't rearrange again.
    Timestamp converter was not handled for 0 value. Need to set it to Null 
when it is 0.
    While deriving the index for the convert to 3 step for new insert flow, 
order was based on internal partition order, instead of internal order.
    
    What changes were proposed in this PR?
    
    changed MV and insert stage command to use optimized insert flow.
    
    After this changes,
    b. CarbonInsertIntoCommand -- insert DML, CTAS DML, MV, insert stage 
command.
    c. CarbonInsertIntoWithDf -- old flow which supports bad record handling 
with converter step method that process update, compaction, df writer, alter 
table scenarios [some problem in rearranging now]
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3615
---
 .../management/CarbonInsertFromStageCommand.scala  | 10 ++-
 .../management/CarbonInsertIntoCommand.scala       | 13 +++-
 .../command/management/CommonLoadUtils.scala       | 10 ++-
 .../table/CarbonCreateTableAsSelectCommand.scala   | 10 +--
 .../mv/extension/MVDataMapProvider.scala           | 14 ++--
 .../carbondata/mv/rewrite/MVCreateTestCase.scala   |  3 +
 .../timeseries/TestMVTimeSeriesLoadAndQuery.scala  | 13 ++++
 .../loading/CarbonDataLoadConfiguration.java       | 40 +++++------
 .../CarbonRowDataWriterProcessorStepImpl.java      | 78 +++++-----------------
 .../carbondata/processing/store/TablePage.java     |  8 ++-
 10 files changed, 89 insertions(+), 110 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index b44cfe5..64f802f 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -321,18 +321,16 @@ case class CarbonInsertFromStageCommand(
         val header = columns.mkString(",")
         val selectColumns = columns.filter(!partition.contains(_))
         val selectedDataFrame = dataFrame.select(selectColumns.head, 
selectColumns.tail: _*)
-        CarbonInsertIntoWithDf(
+        CarbonInsertIntoCommand(
           databaseNameOp = Option(table.getDatabaseName),
           tableName = table.getTableName,
           options = scala.collection.immutable.Map("fileheader" -> header,
             "binary_decoder" -> "base64"),
           isOverwriteTable = false,
-          dataFrame = selectedDataFrame,
-          updateModel = None,
-          tableInfoOp = None,
-          internalOptions = Map.empty,
+          logicalPlan = selectedDataFrame.queryExecution.analyzed,
+          tableInfo = table.getTableInfo,
           partition = partition
-        ).process(spark)
+        ).run(spark)
     }
     LOGGER.info(s"finish data loading, time taken ${ 
System.currentTimeMillis() - start }ms")
   }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 525de29..68189e2 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -176,8 +176,17 @@ case class CarbonInsertIntoCommand(databaseNameOp: 
Option[String],
       convertedStaticPartition)
     scanResultRdd = sparkSession.sessionState.executePlan(newLogicalPlan).toRdd
     if (logicalPartitionRelation != null) {
-      logicalPartitionRelation =
-        getReArrangedSchemaLogicalRelation(reArrangedIndex, 
logicalPartitionRelation)
+      if (selectedColumnSchema.length != 
logicalPartitionRelation.output.length) {
+        throw new RuntimeException(" schema length doesn't match partition 
length")
+      }
+      val isNotReArranged = selectedColumnSchema.zipWithIndex.exists {
+        case (cs, i) => 
!cs.getColumnName.equals(logicalPartitionRelation.output(i).name)
+      }
+      if (isNotReArranged) {
+        // Re-arrange the catalog table schema and output for partition 
relation
+        logicalPartitionRelation =
+          getReArrangedSchemaLogicalRelation(reArrangedIndex, 
logicalPartitionRelation)
+      }
     }
     // Delete stale segment folders that are not in table status but are 
physically present in
     // the Fact folder
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 5c0780e..0aada09 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -50,7 +50,7 @@ import 
org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp
 import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap}
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
-import 
org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator
+import 
org.apache.carbondata.core.keygenerator.directdictionary.timestamp.{DateDirectDictionaryGenerator,
 TimeStampGranularityTypeValue}
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
@@ -728,7 +728,13 @@ object CommonLoadUtils {
     }
     val updatedRdd: RDD[InternalRow] = rdd.map { internalRow =>
       for (index <- timeStampIndex) {
-        internalRow.setLong(index, internalRow.getLong(index) / 1000)
+        if (internalRow.getLong(index) == 0) {
+          internalRow.setNullAt(index)
+        } else {
+          internalRow.setLong(
+            index,
+            internalRow.getLong(index) / 
TimeStampGranularityTypeValue.MILLIS_SECONDS.getValue)
+        }
       }
       var doubleValue: Double = 0
       for (index <- doubleIndex) {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
index ddfed80..9725b4e 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
@@ -41,7 +41,7 @@ case class CarbonCreateTableAsSelectCommand(
     ifNotExistsSet: Boolean = false,
     tableLocation: Option[String] = None) extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonInsertIntoCommand = _
+  var insertIntoCommand: CarbonInsertIntoCommand = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val tableName = tableInfo.getFactTable.getTableName
@@ -76,7 +76,7 @@ case class CarbonCreateTableAsSelectCommand(
         .createCarbonDataSourceHadoopRelation(sparkSession,
           TableIdentifier(tableName, Option(dbName)))
       // execute command to load data into carbon table
-      loadCommand = CarbonInsertIntoCommand(
+      insertIntoCommand = CarbonInsertIntoCommand(
         databaseNameOp = 
Some(carbonDataSourceHadoopRelation.carbonRelation.databaseName),
         tableName = carbonDataSourceHadoopRelation.carbonRelation.tableName,
         options = scala.collection.immutable
@@ -85,14 +85,14 @@ case class CarbonCreateTableAsSelectCommand(
         isOverwriteTable = false,
         logicalPlan = query,
         tableInfo = tableInfo)
-      loadCommand.processMetadata(sparkSession)
+      insertIntoCommand.processMetadata(sparkSession)
     }
     Seq.empty
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    if (null != loadCommand) {
-      loadCommand.processData(sparkSession)
+    if (null != insertIntoCommand) {
+      insertIntoCommand.processData(sparkSession)
     }
     Seq.empty
   }
diff --git 
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
 
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
index 994bc4d..5b400b2 100644
--- 
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
+++ 
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
@@ -21,7 +21,7 @@ import java.io.IOException
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.{CarbonUtils, SparkSession}
-import org.apache.spark.sql.execution.command.management.CarbonInsertIntoWithDf
+import 
org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
 import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
 import org.apache.spark.sql.util.SparkSQLUtil
@@ -141,19 +141,17 @@ class MVDataMapProvider(
           !column.getColumnName
             
.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)
         }.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",")
-      val insertWithDf = CarbonInsertIntoWithDf(
+      val insertIntoCommand = CarbonInsertIntoCommand(
         databaseNameOp = Some(identifier.getDatabaseName),
         tableName = identifier.getTableName,
         options = scala.collection.immutable.Map("fileheader" -> header),
         isOverwriteTable,
-        dataFrame = updatedQuery,
-        updateModel = None,
-        tableInfoOp = None,
+        logicalPlan = updatedQuery.queryExecution.analyzed,
+        tableInfo = dataMapTable.getTableInfo,
         internalOptions = Map("mergedSegmentName" -> newLoadName,
-          CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"),
-        partition = Map.empty)
+          CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"))
       try {
-        insertWithDf.process(sparkSession)
+        insertIntoCommand.run(sparkSession)
       } catch {
         case ex: Exception =>
           // If load to dataMap table fails, disable the dataMap and if 
newLoad is still
diff --git 
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
 
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 55ab5e8..773482f 100644
--- 
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ 
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -1160,6 +1160,8 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("test mv with duplicate columns in query and constant column") {
+    // new optimized insert into flow doesn't support duplicate column names, 
so send it to old flow
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
 "true")
     sql("drop table if exists maintable")
     sql("create table maintable(name string, age int, add string) STORED AS 
carbondata")
     sql("create materialized view dupli_mv as select name, sum(age),sum(age) 
from maintable group by name")
@@ -1178,6 +1180,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     assert(TestUtil.verifyMVDataMap(df4.queryExecution.optimizedPlan, 
"constant_mv"))
     assert(TestUtil.verifyMVDataMap(df5.queryExecution.optimizedPlan, 
"dupli_projection"))
     assert(TestUtil.verifyMVDataMap(df6.queryExecution.optimizedPlan, 
"dupli_projection"))
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
 "false")
   }
 
   test("test mv query when the column names and table name same in join 
scenario") {
diff --git 
a/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
 
b/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
index 11b6078..00b6bd8 100644
--- 
a/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
+++ 
b/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
@@ -21,6 +21,8 @@ import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.mv.rewrite.TestUtil
 
 class TestMVTimeSeriesLoadAndQuery extends QueryTest with BeforeAndAfterAll {
@@ -284,6 +286,9 @@ class TestMVTimeSeriesLoadAndQuery extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("test mv timeseries duplicate columns and constant columns") {
+    // new optimized insert into flow doesn't support duplicate column names, 
so send it to old flow
+    CarbonProperties.getInstance()
+      
.addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
 "true")
     dropDataMap("datamap1")
     sql(
       "create materialized view datamap1 as " +
@@ -300,6 +305,8 @@ class TestMVTimeSeriesLoadAndQuery extends QueryTest with 
BeforeAndAfterAll {
     val df3 = sql("select timeseries(projectjoindate,'month') ,sum(1)  ex from 
maintable group by timeseries(projectjoindate,'month')")
     checkPlan("datamap1", df3)
     dropDataMap("datamap1")
+    CarbonProperties.getInstance()
+      
.addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
 "false")
   }
 
   test("test mv timeseries with like filters") {
@@ -313,6 +320,10 @@ class TestMVTimeSeriesLoadAndQuery extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("test mv timeseries with join scenario") {
+    // this test case datamap table is created with distinct column (2 
columns),
+    // but insert projection has duplicate column(3 columns). Cannot support 
in new insert into flow
+    CarbonProperties.getInstance()
+      
.addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
 "true")
     sql("drop table if exists secondtable")
     sql(
       "CREATE TABLE secondtable (empno int,empname string, projectcode int, 
projectjoindate " +
@@ -328,6 +339,8 @@ class TestMVTimeSeriesLoadAndQuery extends QueryTest with 
BeforeAndAfterAll {
       " from maintable t1 inner join secondtable t2 where" +
       " t2.projectcode = t1.projectcode group by 
timeseries(t1.projectjoindate,'month')")
     checkPlan("datamap1", df)
+    CarbonProperties.getInstance()
+      
.addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
 "false")
   }
 
   test("test create materialized view with group by columns not present in 
projection") {
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 7575754..0965ee6 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -29,6 +29,8 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.util.OutputFilesInfoHolder;
 
 public class CarbonDataLoadConfiguration {
@@ -236,18 +238,12 @@ public class CarbonDataLoadConfiguration {
   }
 
   public DataType[] getMeasureDataType() {
-    List<Integer> measureIndexes = new ArrayList<>(dataFields.length);
-    int measureCount = 0;
-    for (int i = 0; i < dataFields.length; i++) {
-      if (!dataFields[i].getColumn().isDimension()) {
-        measureIndexes.add(i);
-        measureCount++;
-      }
-    }
-
-    DataType[] type = new DataType[measureCount];
+    // data field might be rearranged in case of partition.
+    // so refer internal order not the data field order.
+    List<CarbonMeasure> visibleMeasures = 
tableSpec.getCarbonTable().getVisibleMeasures();
+    DataType[] type = new DataType[visibleMeasures.size()];
     for (int i = 0; i < type.length; i++) {
-      type[i] = dataFields[measureIndexes.get(i)].getColumn().getDataType();
+      type[i] = visibleMeasures.get(i).getDataType();
     }
     return type;
   }
@@ -258,22 +254,16 @@ public class CarbonDataLoadConfiguration {
    * @return
    */
   public CarbonColumn[] getNoDictAndComplexDimensions() {
-    List<Integer> noDicOrCompIndexes = new ArrayList<>(dataFields.length);
-    int noDicCount = 0;
-    for (int i = 0; i < dataFields.length; i++) {
-      if (dataFields[i].getColumn().isDimension() && (
-          dataFields[i].getColumn().getDataType() != DataTypes.DATE || 
dataFields[i].getColumn()
-              .isComplex())) {
-        noDicOrCompIndexes.add(i);
-        noDicCount++;
+    // data field might be rearranged in case of partition.
+    // so refer internal order not the data field order.
+    List<CarbonDimension> visibleDimensions = 
tableSpec.getCarbonTable().getVisibleDimensions();
+    List<CarbonColumn> noDictionaryDimensions = new ArrayList<>();
+    for (int i = 0; i < visibleDimensions.size(); i++) {
+      if (visibleDimensions.get(i).getDataType() != DataTypes.DATE) {
+        noDictionaryDimensions.add(visibleDimensions.get(i));
       }
     }
-
-    CarbonColumn[] dims = new CarbonColumn[noDicCount];
-    for (int i = 0; i < dims.length; i++) {
-      dims[i] = dataFields[noDicOrCompIndexes.get(i)].getColumn();
-    }
-    return dims;
+    return noDictionaryDimensions.toArray(new CarbonColumn[0]);
   }
 
   /**
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index 93392b5..0f5b203 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.processing.loading.steps;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -35,7 +35,7 @@ import 
org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
 import 
org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -52,7 +52,6 @@ import 
org.apache.carbondata.processing.store.CarbonFactHandler;
 import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.log4j.Logger;
 
 /**
@@ -170,70 +169,29 @@ public class CarbonRowDataWriterProcessorStepImpl extends 
AbstractDataLoadProces
   }
 
   private void initializeNoReArrangeIndexes() {
-    List<ColumnSchema> listOfColumns =
-        
configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
-            .getListOfColumns();
-    List<Integer> internalOrder = new ArrayList<>();
-    List<Integer> invisibleIndex = new ArrayList<>();
-    for (ColumnSchema col : listOfColumns) {
-      // consider the invisible columns other than the dummy measure(-1)
-      if (col.isInvisible() && col.getSchemaOrdinal() != -1) {
-        invisibleIndex.add(col.getSchemaOrdinal());
-      }
-    }
-    int complexChildCount = 0;
-    for (ColumnSchema col : listOfColumns) {
-      if (col.isInvisible()) {
-        continue;
-      }
-      if (col.getColumnName().contains(".")) {
-        // If the schema ordinal is -1,
-        // no need to consider it during shifting columns to derive new 
shifted ordinal
-        if (col.getSchemaOrdinal() != -1) {
-          complexChildCount = complexChildCount + 1;
-        }
-      } else {
-        // get number of invisible index count before this column
-        int invisibleIndexCount = 0;
-        for (int index : invisibleIndex) {
-          if (index < col.getSchemaOrdinal()) {
-            invisibleIndexCount++;
-          }
-        }
-        if (col.getDataType().isComplexType()) {
-          // Calculate re-arrange index by ignoring the complex child count.
-          // As projection will have only parent columns
-          internalOrder.add(col.getSchemaOrdinal() - complexChildCount - 
invisibleIndexCount);
-        } else {
-          internalOrder.add(col.getSchemaOrdinal() - invisibleIndexCount);
-        }
-      }
-    }
+    // Data might have partition columns in the end in new insert into flow.
+    // But when convert to 3 parts, just keep in internal order. so derive 
index for that.
+    List<CarbonColumn> listOfColumns = new ArrayList<>();
+    
listOfColumns.addAll(configuration.getTableSpec().getCarbonTable().getVisibleDimensions());
+    
listOfColumns.addAll(configuration.getTableSpec().getCarbonTable().getVisibleMeasures());
     // In case of partition, partition data will be at the end. So, need to 
keep data position
-    List<Pair<DataField, Integer>> dataPositionList = new ArrayList<>();
+    Map<String, Integer> dataPositionMap = new HashMap<>();
     int dataPosition = 0;
     for (DataField field : configuration.getDataFields()) {
-      dataPositionList.add(Pair.of(field, dataPosition++));
-    }
-    // convert to original create order
-    dataPositionList.sort(Comparator.comparingInt(p -> 
p.getKey().getColumn().getSchemaOrdinal()));
-    // re-arranged data fields
-    List<Pair<DataField, Integer>> reArrangedDataFieldList = new ArrayList<>();
-    for (int index : internalOrder) {
-      reArrangedDataFieldList.add(dataPositionList.get(index));
+      dataPositionMap.put(field.getColumn().getColName(), dataPosition++);
     }
-    // get the index of each type and used for 3 parts conversion
-    for (Pair<DataField, Integer> fieldWithDataPosition : 
reArrangedDataFieldList) {
-      if 
(fieldWithDataPosition.getKey().getColumn().hasEncoding(Encoding.DICTIONARY)) {
-        directDictionaryDimensionIndex.add(fieldWithDataPosition.getValue());
+    // get the index of each type and to be used in 3 parts conversion
+    for (CarbonColumn column : listOfColumns) {
+      if (column.hasEncoding(Encoding.DICTIONARY)) {
+        
directDictionaryDimensionIndex.add(dataPositionMap.get(column.getColName()));
       } else {
-        if 
(fieldWithDataPosition.getKey().getColumn().getDataType().isComplexType()) {
-          complexTypeIndex.add(fieldWithDataPosition.getValue());
-        } else if (fieldWithDataPosition.getKey().getColumn().isMeasure()) {
-          measureIndex.add(fieldWithDataPosition.getValue());
+        if (column.getDataType().isComplexType()) {
+          complexTypeIndex.add(dataPositionMap.get(column.getColName()));
+        } else if (column.isMeasure()) {
+          measureIndex.add(dataPositionMap.get(column.getColName()));
         } else {
           // other dimensions
-          otherDimensionIndex.add(fieldWithDataPosition.getValue());
+          otherDimensionIndex.add(dataPositionMap.get(column.getColName()));
         }
       }
     }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 9672de6..aaa9a38 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -99,9 +99,13 @@ public class TablePage {
     noDictDimensionPages = new ColumnPage[model.getNoDictionaryCount()];
     int tmpNumDictDimIdx = 0;
     int tmpNumNoDictDimIdx = 0;
-    for (int i = 0; i < dictDimensionPages.length + 
noDictDimensionPages.length; i++) {
+    for (int i = 0; i < tableSpec.getNumDimensions(); i++) {
       TableSpec.DimensionSpec spec = tableSpec.getDimensionSpec(i);
-      ColumnType columnType = tableSpec.getDimensionSpec(i).getColumnType();
+      if (spec.getSchemaDataType().isComplexType()) {
+        // skip complex columns and go other dimensions.
+        // partition scenario dimensions can present after complex columns.
+        continue;
+      }
       ColumnPage page;
       if (spec.getSchemaDataType() == DataTypes.DATE) {
         page = ColumnPage.newPage(

Reply via email to