Repository: carbondata Updated Branches: refs/heads/master 5aada46e7 -> 63b930c89
[CABONDATA-2741]Fix for filling measure column data in wrong order by not skipping non existing measure column in case of restructure Problem: In case of restructure measure ordinal list in class RestructureUtil only maintains the existing orders but filling data in method fillResultToColumnarBatch of class DictionaryBasedVectorResultCollector doesnot ignores the non existing columns in measureColumnInfo. so, before it was trying to read the data of existing column in non exsting columns maked datatype mismatch. Solution: Making syncup for both measureColumnInfo with measure ordinals. This closes #2507 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/63b930c8 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/63b930c8 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/63b930c8 Branch: refs/heads/master Commit: 63b930c899ae78bb100698e45a40b5c5b2ebf8a8 Parents: 5aada46 Author: Jatin <[email protected]> Authored: Fri Jul 13 21:00:33 2018 +0530 Committer: ravipesala <[email protected]> Committed: Wed Jul 18 17:41:32 2018 +0530 ---------------------------------------------------------------------- .../DictionaryBasedVectorResultCollector.java | 9 +++++- .../partition/TestAlterPartitionTable.scala | 29 ++++++++++--------- .../bucketing/TableBucketingTestCase.scala | 3 +- .../AlterTableValidationTestCase.scala | 30 ++++++++++++++++++++ 4 files changed, 55 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/63b930c8/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java index 4947621..8695d90 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java @@ -109,13 +109,20 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC allColumnInfo[queryDimensions[i].getOrdinal()] = columnVectorInfo; } } + //skipping non existing measure columns in measureColumnInfo as here data + // filling to be done only on existing columns + // for non existing column it is already been filled from restructure based collector + int j = 0; for (int i = 0; i < queryMeasures.length; i++) { + if (!measureInfo.getMeasureExists()[i]) { + continue; + } ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo(); columnVectorInfo.measureVectorFiller = MeasureDataVectorProcessor.MeasureVectorFillerFactory .getMeasureVectorFiller(queryMeasures[i].getMeasure().getDataType()); columnVectorInfo.ordinal = queryMeasures[i].getMeasure().getOrdinal(); columnVectorInfo.measure = queryMeasures[i]; - this.measureColumnInfo[i] = columnVectorInfo; + this.measureColumnInfo[j++] = columnVectorInfo; allColumnInfo[queryMeasures[i].getOrdinal()] = columnVectorInfo; } dictionaryInfo = dictInfoList.toArray(new ColumnVectorInfo[dictInfoList.size()]); http://git-wip-us.apache.org/repos/asf/carbondata/blob/63b930c8/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala index 6bfeb06..882630a 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import org.apache.hadoop.fs.Path +import org.apache.spark.sql.CarbonEnv import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll @@ -246,7 +247,8 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { test("Alter table add partition: List Partition") { sql("""ALTER TABLE list_table_area ADD PARTITION ('OutSpace', 'Hi')""".stripMargin) - val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "list_table_area") + val carbonTable = CarbonEnv + .getCarbonTable(Option("default"), "list_table_area")(sqlContext.sparkSession) val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) val partitionIds = partitionInfo.getPartitionIds val list_info = partitionInfo.getListInfo @@ -305,7 +307,8 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { test("Alter table add partition: Range Partition") { sql("""ALTER TABLE range_table_logdate ADD PARTITION ('2017/01/01', '2018/01/01')""") - val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "range_table_logdate") + val carbonTable = CarbonEnv + .getCarbonTable(Option("default"), "range_table_logdate")(sqlContext.sparkSession) val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) val partitionIds = partitionInfo.getPartitionIds val range_info = partitionInfo.getRangeInfo @@ -442,7 +445,8 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { test("Alter table split partition with different List Sequence: List Partition") { sql("""ALTER TABLE list_table_country ADD PARTITION ('(Part1, Part2, Part3, Part4)')""".stripMargin) sql("""ALTER TABLE list_table_country SPLIT PARTITION(9) INTO ('Part4', 'Part2', '(Part1, Part3)')""".stripMargin) - val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "list_table_country") + val carbonTable = CarbonEnv + .getCarbonTable(Option("default"), "list_table_country")(sqlContext.sparkSession) val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) val partitionIds = partitionInfo.getPartitionIds val list_info = partitionInfo.getListInfo @@ -489,7 +493,8 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { test("Alter table split partition with extra space in New SubList: List Partition") { sql("""ALTER TABLE list_table_area ADD PARTITION ('(One,Two, Three, Four)')""".stripMargin) sql("""ALTER TABLE list_table_area SPLIT PARTITION(6) INTO ('One', '(Two, Three )', 'Four')""".stripMargin) - val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "list_table_area") + val carbonTable = CarbonEnv + .getCarbonTable(Option("default"), "list_table_area")(sqlContext.sparkSession) val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) val partitionIds = partitionInfo.getPartitionIds val list_info = partitionInfo.getListInfo @@ -532,10 +537,8 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { test("Alter table split partition: Range Partition") { sql("""ALTER TABLE range_table_logdate_split SPLIT PARTITION(4) INTO ('2017/01/01', '2018/01/01')""") - val carbonTable = CarbonMetadata.getInstance().getCarbonTable( - "default", - "range_table_logdate_split" - ) + val carbonTable = CarbonEnv + .getCarbonTable(Option("default"), "range_table_logdate_split")(sqlContext.sparkSession) val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) val partitionIds = partitionInfo.getPartitionIds val rangeInfo = partitionInfo.getRangeInfo @@ -596,7 +599,8 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { test("Alter table split partition: Range Partition + Bucket") { sql("""ALTER TABLE range_table_bucket SPLIT PARTITION(4) INTO ('2017/01/01', '2018/01/01')""") - val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "range_table_bucket") + val carbonTable = CarbonEnv + .getCarbonTable(Option("default"), "range_table_bucket")(sqlContext.sparkSession) val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) val partitionIds = partitionInfo.getPartitionIds val rangeInfo = partitionInfo.getRangeInfo @@ -798,11 +802,8 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { | STORED BY 'carbondata' TBLPROPERTIES('PARTITION_TYPE'='RANGE', 'RANGE_INFO'='2015,2016') """.stripMargin) sql("ALTER TABLE carbon_table_default_db ADD PARTITION ('2017')") - - val carbonTable = CarbonMetadata.getInstance().getCarbonTable( - "default", - "carbon_table_default_db" - ) + val carbonTable = CarbonEnv + .getCarbonTable(Option("default"), "carbon_table_default_db")(sqlContext.sparkSession) val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName) val partitionIds = partitionInfo.getPartitionIds val range_info = partitionInfo.getRangeInfo http://git-wip-us.apache.org/repos/asf/carbondata/blob/63b930c8/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala index 65a006b..363db65 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala @@ -17,6 +17,7 @@ package org.apache.spark.carbondata.bucketing +import org.apache.spark.sql.CarbonEnv import org.apache.spark.sql.common.util.Spark2QueryTest import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.scalatest.BeforeAndAfterAll @@ -52,7 +53,7 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll { "serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " + "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')") sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t4") - val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "t4") + val table = CarbonEnv.getCarbonTable(Option("default"), "t4")(sqlContext.sparkSession) if (table != null && table.getBucketingInfo("t4") != null) { assert(true) } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/63b930c8/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala index c7219f8..e146f12 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala @@ -44,6 +44,7 @@ class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAl sql("drop table if exists testalterwithbooleanwithoutdefaultvalue") sql("drop table if exists test") sql("drop table if exists retructure_iud") + sql("drop table if exists restructure_random_select") // clean data folder CarbonProperties.getInstance() @@ -709,6 +710,34 @@ test("test alter command for boolean data type with correct default measure valu Seq(Row(1)) ) } + + test("Alter table selection in random order"){ + def test(): Unit ={ + sql("drop table if exists restructure_random_select") + sql("create table restructure_random_select (imei string,channelsId string,gamePointId double,deviceInformationId double," + + " deliverycharge double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('table_blocksize'='2000','sort_columns'='imei')") + sql("insert into restructure_random_select values('abc','def',50.5,30.2,40.6) ") + sql("Alter table restructure_random_select add columns (age int,name String)") + checkAnswer( + sql("select gamePointId,deviceInformationId,age,name from restructure_random_select where name is NULL or channelsId=4"), + Seq(Row(50.5,30.2,null,null))) + checkAnswer( + sql("select age,name,gamePointId,deviceInformationId from restructure_random_select where name is NULL or channelsId=4"), + Seq(Row(null,null,50.5,30.2))) + } + try { + test() + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false") + test() + } + finally { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, + CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT) + } + } + override def afterAll { sql("DROP TABLE IF EXISTS restructure") sql("drop table if exists table1") @@ -726,5 +755,6 @@ test("test alter command for boolean data type with correct default measure valu sql("drop table if exists testalterwithbooleanwithoutdefaultvalue") sql("drop table if exists test") sql("drop table if exists retructure_iud") + sql("drop table if exists restructure_random_select") } }
