Repository: carbondata
Updated Branches:
  refs/heads/master 953efce4a -> 315f41c12


[CARBONDATA-2060] Fix insert overwrite on partition table

Problem:
When insert overwrite is done on partition table with the table which has empty 
data, it was not overwriting.

Solution:
when insert OverWrite is fired on partition table from empty table, it should 
create new empty segment and it should delete old segments.

This closes #1838


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/315f41c1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/315f41c1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/315f41c1

Branch: refs/heads/master
Commit: 315f41c12557d9d8a355210082f096e1e6a19aca
Parents: 953efce
Author: akashrn5 <[email protected]>
Authored: Fri Jan 19 20:27:05 2018 +0530
Committer: manishgupta88 <[email protected]>
Committed: Tue Jan 23 22:51:07 2018 +0530

----------------------------------------------------------------------
 .../apache/carbondata/core/util/CarbonUtil.java | 20 ++++++-----
 .../hadoop/api/CarbonOutputCommitter.java       |  4 ++-
 .../src/test/resources/partData.csv             |  1 +
 ...tandardPartitionTableOverwriteTestCase.scala | 36 ++++++++++++++++++++
 .../table/CarbonCreateTableCommand.scala        |  6 ++--
 .../spark/sql/hive/CarbonFileMetastore.scala    |  7 ++--
 6 files changed, 59 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/315f41c1/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index f1d474a..b1c0c30 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2260,15 +2260,17 @@ public final class CarbonUtil {
       case S3:
         Path path = new Path(segmentPath);
         FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
-        FileStatus[] fileStatuses = fs.listStatus(path);
-        if (null != fileStatuses) {
-          for (FileStatus dataAndIndexStatus : fileStatuses) {
-            String pathName = dataAndIndexStatus.getPath().getName();
-            if (pathName.endsWith(CarbonTablePath.getCarbonIndexExtension()) 
|| pathName
-                .endsWith(CarbonTablePath.getCarbonMergeIndexExtension())) {
-              carbonIndexSize += dataAndIndexStatus.getLen();
-            } else if 
(pathName.endsWith(CarbonTablePath.getCarbonDataExtension())) {
-              carbonDataSize += dataAndIndexStatus.getLen();
+        if (fs.exists(path)) {
+          FileStatus[] fileStatuses = fs.listStatus(path);
+          if (null != fileStatuses) {
+            for (FileStatus dataAndIndexStatus : fileStatuses) {
+              String pathName = dataAndIndexStatus.getPath().getName();
+              if (pathName.endsWith(CarbonTablePath.getCarbonIndexExtension()) 
|| pathName
+                  .endsWith(CarbonTablePath.getCarbonMergeIndexExtension())) {
+                carbonIndexSize += dataAndIndexStatus.getLen();
+              } else if 
(pathName.endsWith(CarbonTablePath.getCarbonDataExtension())) {
+                carbonDataSize += dataAndIndexStatus.getLen();
+              }
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/315f41c1/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index bc7c56f..eb18bbd 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -69,6 +69,8 @@ public class CarbonOutputCommitter extends 
FileOutputCommitter {
     boolean overwriteSet = 
CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
     CarbonLoadModel loadModel = 
CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
     CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, 
overwriteSet);
+    CarbonLoaderUtil.checkAndCreateCarbonDataLocation(loadModel.getSegmentId(),
+        loadModel.getCarbonDataLoadSchema().getCarbonTable());
     CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), 
loadModel);
   }
 
@@ -103,7 +105,7 @@ public class CarbonOutputCommitter extends 
FileOutputCommitter {
     CarbonTable carbonTable = 
loadModel.getCarbonDataLoadSchema().getCarbonTable();
     long segmentSize = CarbonLoaderUtil
         .addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), 
carbonTable);
-    if (segmentSize > 0) {
+    if (segmentSize > 0 || overwriteSet) {
       String operationContextStr =
           context.getConfiguration().get(
               CarbonTableOutputFormat.OPERATION_CONTEXT,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/315f41c1/integration/spark-common-test/src/test/resources/partData.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/partData.csv 
b/integration/spark-common-test/src/test/resources/partData.csv
new file mode 100644
index 0000000..bd0b15c
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/partData.csv
@@ -0,0 +1 @@
+9000,CUST_NAME_00000,ACTIVE_EMUI_VERSION_00000,1970-01-01 01:00:03,1970-01-01 
02:00:03,123372036854,-223372036854,12345678901.1234000000,22345678901.1234000000,11234567489.7976000000,-11234567489.7976000000,1

http://git-wip-us.apache.org/repos/asf/carbondata/blob/315f41c1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
index 4104ea3..8d31134 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
@@ -174,6 +174,36 @@ class StandardPartitionTableOverwriteTestCase extends 
QueryTest with BeforeAndAf
     checkAnswer(sql("select count(*) from weather7 where month=1"), 
Seq(Row(2)))
   }
 
+  test("test insert overwrite on dynamic partition") {
+    sql("CREATE TABLE uniqdata_hive_dynamic (CUST_ID int,CUST_NAME 
String,ACTIVE_EMUI_VERSION string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 
bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10), DECIMAL_COLUMN2 
decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double, INTEGER_COLUMN1 
int)ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
+    sql("CREATE TABLE uniqdata_string_dynamic(CUST_ID int,CUST_NAME String,DOB 
timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 
bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 
decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 
int) PARTITIONED BY(ACTIVE_EMUI_VERSION string) STORED BY 
'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/partData.csv' into table 
uniqdata_string_dynamic partition(active_emui_version='abc') 
OPTIONS('FILEHEADER'='CUST_ID,CUST_NAME ,ACTIVE_EMUI_VERSION,DOB,DOJ, 
BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1, 
Double_COLUMN2,INTEGER_COLUMN1','BAD_RECORDS_ACTION'='FORCE')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/partData.csv' into table 
uniqdata_string_dynamic partition(active_emui_version='abc') 
OPTIONS('FILEHEADER'='CUST_ID,CUST_NAME ,ACTIVE_EMUI_VERSION,DOB,DOJ, 
BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1, 
Double_COLUMN2,INTEGER_COLUMN1','BAD_RECORDS_ACTION'='FORCE')")
+    sql("insert overwrite table uniqdata_string_dynamic 
partition(active_emui_version='xxx') select CUST_ID, CUST_NAME,DOB,doj, 
bigint_column1, bigint_column2, decimal_column1, 
decimal_column2,double_column1, double_column2,integer_column1 from 
uniqdata_hive_dynamic limit 10")
+    assert(sql("select * from uniqdata_string_dynamic").collect().length == 2)
+    sql("insert overwrite table uniqdata_string_dynamic select CUST_ID, 
CUST_NAME,DOB,doj, bigint_column1, bigint_column2, decimal_column1, 
decimal_column2,double_column1, 
double_column2,integer_column1,ACTIVE_EMUI_VERSION from uniqdata_hive_dynamic 
limit 10")
+    checkAnswer(sql("select * from uniqdata_string_dynamic"), sql("select * 
from uniqdata_hive_dynamic"))
+  }
+
+  test("test insert overwrite on static partition") {
+    sql("CREATE TABLE uniqdata_hive_static (CUST_ID int,CUST_NAME 
String,ACTIVE_EMUI_VERSION string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 
bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10), DECIMAL_COLUMN2 
decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double, INTEGER_COLUMN1 
int)ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
+    sql("CREATE TABLE uniqdata_string_static(CUST_ID int,CUST_NAME String,DOB 
timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 
bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 
decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 
int) PARTITIONED BY(ACTIVE_EMUI_VERSION string) STORED BY 
'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/partData.csv' into table 
uniqdata_string_static OPTIONS('FILEHEADER'='CUST_ID,CUST_NAME 
,ACTIVE_EMUI_VERSION,DOB,DOJ, 
BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1, 
Double_COLUMN2,INTEGER_COLUMN1','BAD_RECORDS_ACTION'='FORCE')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/partData.csv' into table 
uniqdata_string_static OPTIONS('FILEHEADER'='CUST_ID,CUST_NAME 
,ACTIVE_EMUI_VERSION,DOB,DOJ, 
BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1, 
Double_COLUMN2,INTEGER_COLUMN1','BAD_RECORDS_ACTION'='FORCE')")
+    sql("insert overwrite table uniqdata_string_static 
partition(active_emui_version='xxx') select CUST_ID, CUST_NAME,DOB,doj, 
bigint_column1, bigint_column2, decimal_column1, 
decimal_column2,double_column1, double_column2,integer_column1 from 
uniqdata_hive_static limit 10")
+    assert(sql("select * from uniqdata_string_static").collect().length == 2)
+    sql("insert overwrite table uniqdata_string_static select CUST_ID, 
CUST_NAME,DOB,doj, bigint_column1, bigint_column2, decimal_column1, 
decimal_column2,double_column1, 
double_column2,integer_column1,active_emui_version from uniqdata_hive_static 
limit 10")
+    checkAnswer(sql("select * from uniqdata_string_static"), sql("select * 
from uniqdata_hive_static"))
+  }
+
+  test("overwrite whole partition table with empty data") {
+    sql("create table partitionLoadTable(name string, age int) PARTITIONED 
BY(address string) stored by 'carbondata'")
+    sql("insert into partitionLoadTable select 'abc',4,'def'")
+    sql("insert into partitionLoadTable select 'abd',5,'xyz'")
+    sql("create table noLoadTable (name string, age int, address string) 
stored by 'carbondata'")
+    sql("insert overwrite table partitionLoadTable select * from noLoadTable")
+    checkAnswer(sql("select * from partitionLoadTable"), sql("select * from 
noLoadTable"))
+  }
 
   override def afterAll = {
     dropTable
@@ -188,6 +218,12 @@ class StandardPartitionTableOverwriteTestCase extends 
QueryTest with BeforeAndAf
     sql("drop table if exists partitionallcompaction")
     sql("drop table if exists weather6")
     sql("drop table if exists weather7")
+    sql("drop table if exists uniqdata_hive_static")
+    sql("drop table if exists uniqdata_hive_dynamic")
+    sql("drop table if exists uniqdata_string_static")
+    sql("drop table if exists uniqdata_string_dynamic")
+    sql("drop table if exists partitionLoadTable")
+    sql("drop table if exists noLoadTable")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/315f41c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index dac08e0..f38304e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -122,10 +122,10 @@ case class CarbonCreateTableCommand(
             CarbonEnv.getInstance(sparkSession).carbonMetastore
               .dropTable(tableIdentifier)(sparkSession)
 
-            val msg = s"Create table'$tableName' in database '$dbName' failed."
-            LOGGER.audit(msg)
+            val msg = s"Create table'$tableName' in database '$dbName' failed"
+            LOGGER.audit(msg.concat(", ").concat(e.getMessage))
             LOGGER.error(e, msg)
-            CarbonException.analysisException(msg)
+            CarbonException.analysisException(msg.concat(", 
").concat(e.getMessage))
         }
       }
       val createTablePostExecutionEvent: CreateTablePostExecutionEvent =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/315f41c1/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 744fbd8..0c52100 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -17,9 +17,9 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.IOException
 import java.net.URI
 import java.util.UUID
-import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -344,7 +344,10 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val schemaMetadataPath = 
CarbonTablePath.getFolderContainingFile(schemaFilePath)
     val fileType = FileFactory.getFileType(schemaMetadataPath)
     if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
-      FileFactory.mkdirs(schemaMetadataPath, fileType)
+      val isDirCreated = FileFactory.mkdirs(schemaMetadataPath, fileType)
+      if (!isDirCreated) {
+        throw new IOException(s"Failed to create the metadata directory 
$schemaMetadataPath")
+      }
     }
     val thriftWriter = new ThriftWriter(schemaFilePath, false)
     thriftWriter.open(FileWriteOperation.OVERWRITE)

Reply via email to