Repository: carbondata Updated Branches: refs/heads/master 953efce4a -> 315f41c12
[CARBONDATA-2060] Fix insert overwrite on partition table Problem: When insert overwrite is done on partition table with the table which has empty data, it was not overwriting. Solution: when insert OverWrite is fired on partition table from empty table, it should create new empty segment and it should delete old segments. This closes #1838 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/315f41c1 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/315f41c1 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/315f41c1 Branch: refs/heads/master Commit: 315f41c12557d9d8a355210082f096e1e6a19aca Parents: 953efce Author: akashrn5 <[email protected]> Authored: Fri Jan 19 20:27:05 2018 +0530 Committer: manishgupta88 <[email protected]> Committed: Tue Jan 23 22:51:07 2018 +0530 ---------------------------------------------------------------------- .../apache/carbondata/core/util/CarbonUtil.java | 20 ++++++----- .../hadoop/api/CarbonOutputCommitter.java | 4 ++- .../src/test/resources/partData.csv | 1 + ...tandardPartitionTableOverwriteTestCase.scala | 36 ++++++++++++++++++++ .../table/CarbonCreateTableCommand.scala | 6 ++-- .../spark/sql/hive/CarbonFileMetastore.scala | 7 ++-- 6 files changed, 59 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/315f41c1/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index f1d474a..b1c0c30 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -2260,15 +2260,17 @@ public final class CarbonUtil { case S3: Path path = new Path(segmentPath); FileSystem fs = path.getFileSystem(FileFactory.getConfiguration()); - FileStatus[] fileStatuses = fs.listStatus(path); - if (null != fileStatuses) { - for (FileStatus dataAndIndexStatus : fileStatuses) { - String pathName = dataAndIndexStatus.getPath().getName(); - if (pathName.endsWith(CarbonTablePath.getCarbonIndexExtension()) || pathName - .endsWith(CarbonTablePath.getCarbonMergeIndexExtension())) { - carbonIndexSize += dataAndIndexStatus.getLen(); - } else if (pathName.endsWith(CarbonTablePath.getCarbonDataExtension())) { - carbonDataSize += dataAndIndexStatus.getLen(); + if (fs.exists(path)) { + FileStatus[] fileStatuses = fs.listStatus(path); + if (null != fileStatuses) { + for (FileStatus dataAndIndexStatus : fileStatuses) { + String pathName = dataAndIndexStatus.getPath().getName(); + if (pathName.endsWith(CarbonTablePath.getCarbonIndexExtension()) || pathName + .endsWith(CarbonTablePath.getCarbonMergeIndexExtension())) { + carbonIndexSize += dataAndIndexStatus.getLen(); + } else if (pathName.endsWith(CarbonTablePath.getCarbonDataExtension())) { + carbonDataSize += dataAndIndexStatus.getLen(); + } } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/315f41c1/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index bc7c56f..eb18bbd 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -69,6 +69,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter { boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet); + CarbonLoaderUtil.checkAndCreateCarbonDataLocation(loadModel.getSegmentId(), + loadModel.getCarbonDataLoadSchema().getCarbonTable()); CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), loadModel); } @@ -103,7 +105,7 @@ public class CarbonOutputCommitter extends FileOutputCommitter { CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable(); long segmentSize = CarbonLoaderUtil .addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), carbonTable); - if (segmentSize > 0) { + if (segmentSize > 0 || overwriteSet) { String operationContextStr = context.getConfiguration().get( CarbonTableOutputFormat.OPERATION_CONTEXT, http://git-wip-us.apache.org/repos/asf/carbondata/blob/315f41c1/integration/spark-common-test/src/test/resources/partData.csv ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/resources/partData.csv b/integration/spark-common-test/src/test/resources/partData.csv new file mode 100644 index 0000000..bd0b15c --- /dev/null +++ b/integration/spark-common-test/src/test/resources/partData.csv @@ -0,0 +1 @@ +9000,CUST_NAME_00000,ACTIVE_EMUI_VERSION_00000,1970-01-01 01:00:03,1970-01-01 02:00:03,123372036854,-223372036854,12345678901.1234000000,22345678901.1234000000,11234567489.7976000000,-11234567489.7976000000,1 http://git-wip-us.apache.org/repos/asf/carbondata/blob/315f41c1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala index 4104ea3..8d31134 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala @@ -174,6 +174,36 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf checkAnswer(sql("select count(*) from weather7 where month=1"), Seq(Row(2))) } + test("test insert overwrite on dynamic partition") { + sql("CREATE TABLE uniqdata_hive_dynamic (CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10), DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double, INTEGER_COLUMN1 int)ROW FORMAT DELIMITED FIELDS TERMINATED BY ','") + sql("CREATE TABLE uniqdata_string_dynamic(CUST_ID int,CUST_NAME String,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) PARTITIONED BY(ACTIVE_EMUI_VERSION string) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB')") + sql(s"LOAD DATA INPATH '$resourcesPath/partData.csv' into table uniqdata_string_dynamic partition(active_emui_version='abc') OPTIONS('FILEHEADER'='CUST_ID,CUST_NAME ,ACTIVE_EMUI_VERSION,DOB,DOJ, BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1, Double_COLUMN2,INTEGER_COLUMN1','BAD_RECORDS_ACTION'='FORCE')") + sql(s"LOAD DATA INPATH '$resourcesPath/partData.csv' into table uniqdata_string_dynamic partition(active_emui_version='abc') OPTIONS('FILEHEADER'='CUST_ID,CUST_NAME ,ACTIVE_EMUI_VERSION,DOB,DOJ, BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1, Double_COLUMN2,INTEGER_COLUMN1','BAD_RECORDS_ACTION'='FORCE')") + sql("insert overwrite table uniqdata_string_dynamic partition(active_emui_version='xxx') select CUST_ID, CUST_NAME,DOB,doj, bigint_column1, bigint_column2, decimal_column1, decimal_column2,double_column1, double_column2,integer_column1 from uniqdata_hive_dynamic limit 10") + assert(sql("select * from uniqdata_string_dynamic").collect().length == 2) + sql("insert overwrite table uniqdata_string_dynamic select CUST_ID, CUST_NAME,DOB,doj, bigint_column1, bigint_column2, decimal_column1, decimal_column2,double_column1, double_column2,integer_column1,ACTIVE_EMUI_VERSION from uniqdata_hive_dynamic limit 10") + checkAnswer(sql("select * from uniqdata_string_dynamic"), sql("select * from uniqdata_hive_dynamic")) + } + + test("test insert overwrite on static partition") { + sql("CREATE TABLE uniqdata_hive_static (CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10), DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double, INTEGER_COLUMN1 int)ROW FORMAT DELIMITED FIELDS TERMINATED BY ','") + sql("CREATE TABLE uniqdata_string_static(CUST_ID int,CUST_NAME String,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) PARTITIONED BY(ACTIVE_EMUI_VERSION string) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB')") + sql(s"LOAD DATA INPATH '$resourcesPath/partData.csv' into table uniqdata_string_static OPTIONS('FILEHEADER'='CUST_ID,CUST_NAME ,ACTIVE_EMUI_VERSION,DOB,DOJ, BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1, Double_COLUMN2,INTEGER_COLUMN1','BAD_RECORDS_ACTION'='FORCE')") + sql(s"LOAD DATA INPATH '$resourcesPath/partData.csv' into table uniqdata_string_static OPTIONS('FILEHEADER'='CUST_ID,CUST_NAME ,ACTIVE_EMUI_VERSION,DOB,DOJ, BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1, Double_COLUMN2,INTEGER_COLUMN1','BAD_RECORDS_ACTION'='FORCE')") + sql("insert overwrite table uniqdata_string_static partition(active_emui_version='xxx') select CUST_ID, CUST_NAME,DOB,doj, bigint_column1, bigint_column2, decimal_column1, decimal_column2,double_column1, double_column2,integer_column1 from uniqdata_hive_static limit 10") + assert(sql("select * from uniqdata_string_static").collect().length == 2) + sql("insert overwrite table uniqdata_string_static select CUST_ID, CUST_NAME,DOB,doj, bigint_column1, bigint_column2, decimal_column1, decimal_column2,double_column1, double_column2,integer_column1,active_emui_version from uniqdata_hive_static limit 10") + checkAnswer(sql("select * from uniqdata_string_static"), sql("select * from uniqdata_hive_static")) + } + + test("overwrite whole partition table with empty data") { + sql("create table partitionLoadTable(name string, age int) PARTITIONED BY(address string) stored by 'carbondata'") + sql("insert into partitionLoadTable select 'abc',4,'def'") + sql("insert into partitionLoadTable select 'abd',5,'xyz'") + sql("create table noLoadTable (name string, age int, address string) stored by 'carbondata'") + sql("insert overwrite table partitionLoadTable select * from noLoadTable") + checkAnswer(sql("select * from partitionLoadTable"), sql("select * from noLoadTable")) + } override def afterAll = { dropTable @@ -188,6 +218,12 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf sql("drop table if exists partitionallcompaction") sql("drop table if exists weather6") sql("drop table if exists weather7") + sql("drop table if exists uniqdata_hive_static") + sql("drop table if exists uniqdata_hive_dynamic") + sql("drop table if exists uniqdata_string_static") + sql("drop table if exists uniqdata_string_dynamic") + sql("drop table if exists partitionLoadTable") + sql("drop table if exists noLoadTable") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/315f41c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala index dac08e0..f38304e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala @@ -122,10 +122,10 @@ case class CarbonCreateTableCommand( CarbonEnv.getInstance(sparkSession).carbonMetastore .dropTable(tableIdentifier)(sparkSession) - val msg = s"Create table'$tableName' in database '$dbName' failed." - LOGGER.audit(msg) + val msg = s"Create table'$tableName' in database '$dbName' failed" + LOGGER.audit(msg.concat(", ").concat(e.getMessage)) LOGGER.error(e, msg) - CarbonException.analysisException(msg) + CarbonException.analysisException(msg.concat(", ").concat(e.getMessage)) } } val createTablePostExecutionEvent: CreateTablePostExecutionEvent = http://git-wip-us.apache.org/repos/asf/carbondata/blob/315f41c1/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 744fbd8..0c52100 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql.hive +import java.io.IOException import java.net.URI import java.util.UUID -import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.ArrayBuffer @@ -344,7 +344,10 @@ class CarbonFileMetastore extends CarbonMetaStore { val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath) val fileType = FileFactory.getFileType(schemaMetadataPath) if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) { - FileFactory.mkdirs(schemaMetadataPath, fileType) + val isDirCreated = FileFactory.mkdirs(schemaMetadataPath, fileType) + if (!isDirCreated) { + throw new IOException(s"Failed to create the metadata directory $schemaMetadataPath") + } } val thriftWriter = new ThriftWriter(schemaFilePath, false) thriftWriter.open(FileWriteOperation.OVERWRITE)
