Repository: carbondata Updated Branches: refs/heads/master ab9a4fc76 -> f49e1c397
[CARBONDATA-1422] Fixed compaction issue when decimal is present This closes #1300 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f49e1c39 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f49e1c39 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f49e1c39 Branch: refs/heads/master Commit: f49e1c3978130ca4c3b062f2fded33fbac56ae54 Parents: ab9a4fc Author: Ravindra Pesala <[email protected]> Authored: Wed Aug 30 17:49:29 2017 +0530 Committer: Jacky Li <[email protected]> Committed: Thu Aug 31 11:31:32 2017 +0800 ---------------------------------------------------------------------- .../generated/QueriesCompactionTestCase.scala | 32 ++++++++++++++++++++ .../carbondata/spark/rdd/CarbonMergerRDD.scala | 5 +-- 2 files changed, 35 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/f49e1c39/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesCompactionTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesCompactionTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesCompactionTestCase.scala index 7e568b0..5d9a3ee 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesCompactionTestCase.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesCompactionTestCase.scala @@ -6800,6 +6800,38 @@ class QueriesCompactionTestCase extends QueryTest with BeforeAndAfterAll { } + test("Compaction_Bug_JIRA_1422") { + + sql("DROP TABLE IF EXISTS minortest") + + // Create table + sql( + s""" + CREATE TABLE minortest (CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10), DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ("TABLE_BLOCKSIZE"= "256 MB") + """.stripMargin) + + sql( + s""" + LOAD DATA inpath '$resourcesPath/Data/uniqdata/2000_UniqData.csv' INTO table minortest OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID,CUST_NAME,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1') + """.stripMargin).show() + + sql( + s""" + LOAD DATA inpath '$resourcesPath/Data/uniqdata/2000_UniqData.csv' INTO table minortest OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID,CUST_NAME,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1') + """.stripMargin).show() + sql( + s""" + LOAD DATA inpath '$resourcesPath/Data/uniqdata/2000_UniqData.csv' INTO table minortest OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID,CUST_NAME,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1') + """.stripMargin).show() + sql( + s""" + LOAD DATA inpath '$resourcesPath/Data/uniqdata/2000_UniqData.csv' INTO table minortest OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID,CUST_NAME,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1') + """.stripMargin).show() + + sql("""alter table minortest compact 'minor'""") + sql("DROP TABLE IF EXISTS minortest") + } + override def afterAll { sql("drop table if exists Comp_VMALL_DICTIONARY_INCLUDE") sql("drop table if exists Comp_VMALL_DICTIONARY_INCLUDE_hive") http://git-wip-us.apache.org/repos/asf/carbondata/blob/f49e1c39/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 07264b5..52f37ef 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark._ -import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo} import org.apache.spark.sql.hive.DistributionUtil @@ -42,7 +41,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.mutate.UpdateVO import org.apache.carbondata.core.scan.result.iterator.RawResultIterator import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit} import org.apache.carbondata.hadoop.api.CarbonTableInputFormat import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo} @@ -52,6 +51,7 @@ import org.apache.carbondata.processing.util.CarbonDataProcessorUtil import org.apache.carbondata.spark.MergeResult import org.apache.carbondata.spark.load.CarbonLoaderUtil import org.apache.carbondata.spark.splits.TableSplit +import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl class CarbonMergerRDD[K, V]( sc: SparkContext, @@ -175,6 +175,7 @@ class CarbonMergerRDD[K, V]( .checkIfAnyRestructuredBlockExists(segmentMapping, dataFileMetadataSegMapping, carbonTable.getTableLastUpdatedTime) + DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl) exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, carbonTable, dataFileMetadataSegMapping, restructuredBlockExists)
