This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 6b7facc [CARBONDATA-3637] Use optimized insert flow for MV and insert
stage command
6b7facc is described below
commit 6b7faccde15371e316900bf2e3552c653b0c05e3
Author: ajantha-bhat <[email protected]>
AuthorDate: Sat Feb 15 08:42:42 2020 +0530
[CARBONDATA-3637] Use optimized insert flow for MV and insert stage command
Why is this PR needed?
MV and insert stage can use the new optimized insert into flow.
Also In the new insert into flow, found one issue with the partition
column. Fixed it.
If the catalog table schema is already rearranged, don't rearrange again.
Timestamp converter was not handled for 0 value. Need to set it to Null
when it is 0.
While deriving the index for the convert to 3 step for new insert flow,
order was based on internal partition order, instead of internal order.
What changes were proposed in this PR?
changed MV and insert stage command to use optimized insert flow.
After this changes,
b. CarbonInsertIntoCommand -- insert DML, CTAS DML, MV, insert stage
command.
c. CarbonInsertIntoWithDf -- old flow which supports bad record handling
with converter step method that process update, compaction, df writer, alter
table scenarios [some problem in rearranging now]
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
This closes #3615
---
.../management/CarbonInsertFromStageCommand.scala | 10 ++-
.../management/CarbonInsertIntoCommand.scala | 13 +++-
.../command/management/CommonLoadUtils.scala | 10 ++-
.../table/CarbonCreateTableAsSelectCommand.scala | 10 +--
.../mv/extension/MVDataMapProvider.scala | 14 ++--
.../carbondata/mv/rewrite/MVCreateTestCase.scala | 3 +
.../timeseries/TestMVTimeSeriesLoadAndQuery.scala | 13 ++++
.../loading/CarbonDataLoadConfiguration.java | 40 +++++------
.../CarbonRowDataWriterProcessorStepImpl.java | 78 +++++-----------------
.../carbondata/processing/store/TablePage.java | 8 ++-
10 files changed, 89 insertions(+), 110 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index b44cfe5..64f802f 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -321,18 +321,16 @@ case class CarbonInsertFromStageCommand(
val header = columns.mkString(",")
val selectColumns = columns.filter(!partition.contains(_))
val selectedDataFrame = dataFrame.select(selectColumns.head,
selectColumns.tail: _*)
- CarbonInsertIntoWithDf(
+ CarbonInsertIntoCommand(
databaseNameOp = Option(table.getDatabaseName),
tableName = table.getTableName,
options = scala.collection.immutable.Map("fileheader" -> header,
"binary_decoder" -> "base64"),
isOverwriteTable = false,
- dataFrame = selectedDataFrame,
- updateModel = None,
- tableInfoOp = None,
- internalOptions = Map.empty,
+ logicalPlan = selectedDataFrame.queryExecution.analyzed,
+ tableInfo = table.getTableInfo,
partition = partition
- ).process(spark)
+ ).run(spark)
}
LOGGER.info(s"finish data loading, time taken ${
System.currentTimeMillis() - start }ms")
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 525de29..68189e2 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -176,8 +176,17 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
convertedStaticPartition)
scanResultRdd = sparkSession.sessionState.executePlan(newLogicalPlan).toRdd
if (logicalPartitionRelation != null) {
- logicalPartitionRelation =
- getReArrangedSchemaLogicalRelation(reArrangedIndex,
logicalPartitionRelation)
+ if (selectedColumnSchema.length !=
logicalPartitionRelation.output.length) {
+ throw new RuntimeException(" schema length doesn't match partition
length")
+ }
+ val isNotReArranged = selectedColumnSchema.zipWithIndex.exists {
+ case (cs, i) =>
!cs.getColumnName.equals(logicalPartitionRelation.output(i).name)
+ }
+ if (isNotReArranged) {
+ // Re-arrange the catalog table schema and output for partition
relation
+ logicalPartitionRelation =
+ getReArrangedSchemaLogicalRelation(reArrangedIndex,
logicalPartitionRelation)
+ }
}
// Delete stale segment folders that are not in table status but are
physically present in
// the Fact folder
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 5c0780e..0aada09 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -50,7 +50,7 @@ import
org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp
import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap}
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
-import
org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator
+import
org.apache.carbondata.core.keygenerator.directdictionary.timestamp.{DateDirectDictionaryGenerator,
TimeStampGranularityTypeValue}
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable,
TableInfo}
@@ -728,7 +728,13 @@ object CommonLoadUtils {
}
val updatedRdd: RDD[InternalRow] = rdd.map { internalRow =>
for (index <- timeStampIndex) {
- internalRow.setLong(index, internalRow.getLong(index) / 1000)
+ if (internalRow.getLong(index) == 0) {
+ internalRow.setNullAt(index)
+ } else {
+ internalRow.setLong(
+ index,
+ internalRow.getLong(index) /
TimeStampGranularityTypeValue.MILLIS_SECONDS.getValue)
+ }
}
var doubleValue: Double = 0
for (index <- doubleIndex) {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
index ddfed80..9725b4e 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
@@ -41,7 +41,7 @@ case class CarbonCreateTableAsSelectCommand(
ifNotExistsSet: Boolean = false,
tableLocation: Option[String] = None) extends AtomicRunnableCommand {
- var loadCommand: CarbonInsertIntoCommand = _
+ var insertIntoCommand: CarbonInsertIntoCommand = _
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val tableName = tableInfo.getFactTable.getTableName
@@ -76,7 +76,7 @@ case class CarbonCreateTableAsSelectCommand(
.createCarbonDataSourceHadoopRelation(sparkSession,
TableIdentifier(tableName, Option(dbName)))
// execute command to load data into carbon table
- loadCommand = CarbonInsertIntoCommand(
+ insertIntoCommand = CarbonInsertIntoCommand(
databaseNameOp =
Some(carbonDataSourceHadoopRelation.carbonRelation.databaseName),
tableName = carbonDataSourceHadoopRelation.carbonRelation.tableName,
options = scala.collection.immutable
@@ -85,14 +85,14 @@ case class CarbonCreateTableAsSelectCommand(
isOverwriteTable = false,
logicalPlan = query,
tableInfo = tableInfo)
- loadCommand.processMetadata(sparkSession)
+ insertIntoCommand.processMetadata(sparkSession)
}
Seq.empty
}
override def processData(sparkSession: SparkSession): Seq[Row] = {
- if (null != loadCommand) {
- loadCommand.processData(sparkSession)
+ if (null != insertIntoCommand) {
+ insertIntoCommand.processData(sparkSession)
}
Seq.empty
}
diff --git
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
index 994bc4d..5b400b2 100644
---
a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
+++
b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
@@ -21,7 +21,7 @@ import java.io.IOException
import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonUtils, SparkSession}
-import org.apache.spark.sql.execution.command.management.CarbonInsertIntoWithDf
+import
org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
import org.apache.spark.sql.util.SparkSQLUtil
@@ -141,19 +141,17 @@ class MVDataMapProvider(
!column.getColumnName
.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)
}.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",")
- val insertWithDf = CarbonInsertIntoWithDf(
+ val insertIntoCommand = CarbonInsertIntoCommand(
databaseNameOp = Some(identifier.getDatabaseName),
tableName = identifier.getTableName,
options = scala.collection.immutable.Map("fileheader" -> header),
isOverwriteTable,
- dataFrame = updatedQuery,
- updateModel = None,
- tableInfoOp = None,
+ logicalPlan = updatedQuery.queryExecution.analyzed,
+ tableInfo = dataMapTable.getTableInfo,
internalOptions = Map("mergedSegmentName" -> newLoadName,
- CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"),
- partition = Map.empty)
+ CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"))
try {
- insertWithDf.process(sparkSession)
+ insertIntoCommand.run(sparkSession)
} catch {
case ex: Exception =>
// If load to dataMap table fails, disable the dataMap and if
newLoad is still
diff --git
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 55ab5e8..773482f 100644
---
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -1160,6 +1160,8 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
}
test("test mv with duplicate columns in query and constant column") {
+ // new optimized insert into flow doesn't support duplicate column names,
so send it to old flow
+
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
"true")
sql("drop table if exists maintable")
sql("create table maintable(name string, age int, add string) STORED AS
carbondata")
sql("create materialized view dupli_mv as select name, sum(age),sum(age)
from maintable group by name")
@@ -1178,6 +1180,7 @@ class MVCreateTestCase extends QueryTest with
BeforeAndAfterAll {
assert(TestUtil.verifyMVDataMap(df4.queryExecution.optimizedPlan,
"constant_mv"))
assert(TestUtil.verifyMVDataMap(df5.queryExecution.optimizedPlan,
"dupli_projection"))
assert(TestUtil.verifyMVDataMap(df6.queryExecution.optimizedPlan,
"dupli_projection"))
+
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
"false")
}
test("test mv query when the column names and table name same in join
scenario") {
diff --git
a/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
b/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
index 11b6078..00b6bd8 100644
---
a/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
+++
b/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
@@ -21,6 +21,8 @@ import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.mv.rewrite.TestUtil
class TestMVTimeSeriesLoadAndQuery extends QueryTest with BeforeAndAfterAll {
@@ -284,6 +286,9 @@ class TestMVTimeSeriesLoadAndQuery extends QueryTest with
BeforeAndAfterAll {
}
test("test mv timeseries duplicate columns and constant columns") {
+ // new optimized insert into flow doesn't support duplicate column names,
so send it to old flow
+ CarbonProperties.getInstance()
+
.addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
"true")
dropDataMap("datamap1")
sql(
"create materialized view datamap1 as " +
@@ -300,6 +305,8 @@ class TestMVTimeSeriesLoadAndQuery extends QueryTest with
BeforeAndAfterAll {
val df3 = sql("select timeseries(projectjoindate,'month') ,sum(1) ex from
maintable group by timeseries(projectjoindate,'month')")
checkPlan("datamap1", df3)
dropDataMap("datamap1")
+ CarbonProperties.getInstance()
+
.addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
"false")
}
test("test mv timeseries with like filters") {
@@ -313,6 +320,10 @@ class TestMVTimeSeriesLoadAndQuery extends QueryTest with
BeforeAndAfterAll {
}
test("test mv timeseries with join scenario") {
+ // this test case datamap table is created with distinct column (2
columns),
+ // but insert projection has duplicate column(3 columns). Cannot support
in new insert into flow
+ CarbonProperties.getInstance()
+
.addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
"true")
sql("drop table if exists secondtable")
sql(
"CREATE TABLE secondtable (empno int,empname string, projectcode int,
projectjoindate " +
@@ -328,6 +339,8 @@ class TestMVTimeSeriesLoadAndQuery extends QueryTest with
BeforeAndAfterAll {
" from maintable t1 inner join secondtable t2 where" +
" t2.projectcode = t1.projectcode group by
timeseries(t1.projectjoindate,'month')")
checkPlan("datamap1", df)
+ CarbonProperties.getInstance()
+
.addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
"false")
}
test("test create materialized view with group by columns not present in
projection") {
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 7575754..0965ee6 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -29,6 +29,8 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.schema.BucketingInfo;
import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.util.OutputFilesInfoHolder;
public class CarbonDataLoadConfiguration {
@@ -236,18 +238,12 @@ public class CarbonDataLoadConfiguration {
}
public DataType[] getMeasureDataType() {
- List<Integer> measureIndexes = new ArrayList<>(dataFields.length);
- int measureCount = 0;
- for (int i = 0; i < dataFields.length; i++) {
- if (!dataFields[i].getColumn().isDimension()) {
- measureIndexes.add(i);
- measureCount++;
- }
- }
-
- DataType[] type = new DataType[measureCount];
+ // data field might be rearranged in case of partition.
+ // so refer internal order not the data field order.
+ List<CarbonMeasure> visibleMeasures =
tableSpec.getCarbonTable().getVisibleMeasures();
+ DataType[] type = new DataType[visibleMeasures.size()];
for (int i = 0; i < type.length; i++) {
- type[i] = dataFields[measureIndexes.get(i)].getColumn().getDataType();
+ type[i] = visibleMeasures.get(i).getDataType();
}
return type;
}
@@ -258,22 +254,16 @@ public class CarbonDataLoadConfiguration {
* @return
*/
public CarbonColumn[] getNoDictAndComplexDimensions() {
- List<Integer> noDicOrCompIndexes = new ArrayList<>(dataFields.length);
- int noDicCount = 0;
- for (int i = 0; i < dataFields.length; i++) {
- if (dataFields[i].getColumn().isDimension() && (
- dataFields[i].getColumn().getDataType() != DataTypes.DATE ||
dataFields[i].getColumn()
- .isComplex())) {
- noDicOrCompIndexes.add(i);
- noDicCount++;
+ // data field might be rearranged in case of partition.
+ // so refer internal order not the data field order.
+ List<CarbonDimension> visibleDimensions =
tableSpec.getCarbonTable().getVisibleDimensions();
+ List<CarbonColumn> noDictionaryDimensions = new ArrayList<>();
+ for (int i = 0; i < visibleDimensions.size(); i++) {
+ if (visibleDimensions.get(i).getDataType() != DataTypes.DATE) {
+ noDictionaryDimensions.add(visibleDimensions.get(i));
}
}
-
- CarbonColumn[] dims = new CarbonColumn[noDicCount];
- for (int i = 0; i < dims.length; i++) {
- dims[i] = dataFields[noDicOrCompIndexes.get(i)].getColumn();
- }
- return dims;
+ return noDictionaryDimensions.toArray(new CarbonColumn[0]);
}
/**
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index 93392b5..0f5b203 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.processing.loading.steps;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -35,7 +35,7 @@ import
org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
import
org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
@@ -52,7 +52,6 @@ import
org.apache.carbondata.processing.store.CarbonFactHandler;
import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.log4j.Logger;
/**
@@ -170,70 +169,29 @@ public class CarbonRowDataWriterProcessorStepImpl extends
AbstractDataLoadProces
}
private void initializeNoReArrangeIndexes() {
- List<ColumnSchema> listOfColumns =
-
configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
- .getListOfColumns();
- List<Integer> internalOrder = new ArrayList<>();
- List<Integer> invisibleIndex = new ArrayList<>();
- for (ColumnSchema col : listOfColumns) {
- // consider the invisible columns other than the dummy measure(-1)
- if (col.isInvisible() && col.getSchemaOrdinal() != -1) {
- invisibleIndex.add(col.getSchemaOrdinal());
- }
- }
- int complexChildCount = 0;
- for (ColumnSchema col : listOfColumns) {
- if (col.isInvisible()) {
- continue;
- }
- if (col.getColumnName().contains(".")) {
- // If the schema ordinal is -1,
- // no need to consider it during shifting columns to derive new
shifted ordinal
- if (col.getSchemaOrdinal() != -1) {
- complexChildCount = complexChildCount + 1;
- }
- } else {
- // get number of invisible index count before this column
- int invisibleIndexCount = 0;
- for (int index : invisibleIndex) {
- if (index < col.getSchemaOrdinal()) {
- invisibleIndexCount++;
- }
- }
- if (col.getDataType().isComplexType()) {
- // Calculate re-arrange index by ignoring the complex child count.
- // As projection will have only parent columns
- internalOrder.add(col.getSchemaOrdinal() - complexChildCount -
invisibleIndexCount);
- } else {
- internalOrder.add(col.getSchemaOrdinal() - invisibleIndexCount);
- }
- }
- }
+ // Data might have partition columns in the end in new insert into flow.
+ // But when convert to 3 parts, just keep in internal order. so derive
index for that.
+ List<CarbonColumn> listOfColumns = new ArrayList<>();
+
listOfColumns.addAll(configuration.getTableSpec().getCarbonTable().getVisibleDimensions());
+
listOfColumns.addAll(configuration.getTableSpec().getCarbonTable().getVisibleMeasures());
// In case of partition, partition data will be at the end. So, need to
keep data position
- List<Pair<DataField, Integer>> dataPositionList = new ArrayList<>();
+ Map<String, Integer> dataPositionMap = new HashMap<>();
int dataPosition = 0;
for (DataField field : configuration.getDataFields()) {
- dataPositionList.add(Pair.of(field, dataPosition++));
- }
- // convert to original create order
- dataPositionList.sort(Comparator.comparingInt(p ->
p.getKey().getColumn().getSchemaOrdinal()));
- // re-arranged data fields
- List<Pair<DataField, Integer>> reArrangedDataFieldList = new ArrayList<>();
- for (int index : internalOrder) {
- reArrangedDataFieldList.add(dataPositionList.get(index));
+ dataPositionMap.put(field.getColumn().getColName(), dataPosition++);
}
- // get the index of each type and used for 3 parts conversion
- for (Pair<DataField, Integer> fieldWithDataPosition :
reArrangedDataFieldList) {
- if
(fieldWithDataPosition.getKey().getColumn().hasEncoding(Encoding.DICTIONARY)) {
- directDictionaryDimensionIndex.add(fieldWithDataPosition.getValue());
+ // get the index of each type and to be used in 3 parts conversion
+ for (CarbonColumn column : listOfColumns) {
+ if (column.hasEncoding(Encoding.DICTIONARY)) {
+
directDictionaryDimensionIndex.add(dataPositionMap.get(column.getColName()));
} else {
- if
(fieldWithDataPosition.getKey().getColumn().getDataType().isComplexType()) {
- complexTypeIndex.add(fieldWithDataPosition.getValue());
- } else if (fieldWithDataPosition.getKey().getColumn().isMeasure()) {
- measureIndex.add(fieldWithDataPosition.getValue());
+ if (column.getDataType().isComplexType()) {
+ complexTypeIndex.add(dataPositionMap.get(column.getColName()));
+ } else if (column.isMeasure()) {
+ measureIndex.add(dataPositionMap.get(column.getColName()));
} else {
// other dimensions
- otherDimensionIndex.add(fieldWithDataPosition.getValue());
+ otherDimensionIndex.add(dataPositionMap.get(column.getColName()));
}
}
}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 9672de6..aaa9a38 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -99,9 +99,13 @@ public class TablePage {
noDictDimensionPages = new ColumnPage[model.getNoDictionaryCount()];
int tmpNumDictDimIdx = 0;
int tmpNumNoDictDimIdx = 0;
- for (int i = 0; i < dictDimensionPages.length +
noDictDimensionPages.length; i++) {
+ for (int i = 0; i < tableSpec.getNumDimensions(); i++) {
TableSpec.DimensionSpec spec = tableSpec.getDimensionSpec(i);
- ColumnType columnType = tableSpec.getDimensionSpec(i).getColumnType();
+ if (spec.getSchemaDataType().isComplexType()) {
+ // skip complex columns and go other dimensions.
+ // partition scenario dimensions can present after complex columns.
+ continue;
+ }
ColumnPage page;
if (spec.getSchemaDataType() == DataTypes.DATE) {
page = ColumnPage.newPage(