Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 b360f9084 -> b9a6b6865


[CARBONDATA-2217]fix drop partition for non existing partition and set 
FactTimeStamp during compaction for partition table

Problem:
1)when drop partition is fired for a column which does not exists , it throws 
null pointer exception
2)select * is not working when clean files operation is fired after second 
level of compaction, it throws exception sometimes
3)new segment is getting created for all the segments if any one partition is 
dropped

Solution:
1)have a null check , if column does not exists
2)give different timestamp for fact files during compaction to avoid deletion 
of files during clean files
3)for the partition which is dropped, only for that new segment file should be 
written and not for all the partition
4) This PR also contains fix for creating a pre aggregate table with same name 
which has already created in other database

This closes #2017


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b9a6b686
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b9a6b686
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b9a6b686

Branch: refs/heads/branch-1.3
Commit: b9a6b68658fd0f7f408102374b3ef31dcfe44cea
Parents: b360f90
Author: akashrn5 <akashnilu...@gmail.com>
Authored: Wed Feb 28 17:28:43 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Fri Mar 2 19:10:24 2018 +0530

----------------------------------------------------------------------
 .../core/metadata/SegmentFileStore.java         | 24 +++++++++--------
 .../preaggregate/TestPreAggCreateCommand.scala  | 27 ++++++++++++++++++--
 .../StandardPartitionGlobalSortTestCase.scala   | 13 ++++++++++
 .../spark/rdd/CarbonDataRDDFactory.scala        |  3 +++
 .../spark/rdd/CarbonTableCompactor.scala        |  3 +++
 .../datamap/CarbonCreateDataMapCommand.scala    |  2 +-
 .../management/CarbonLoadDataCommand.scala      | 21 ++++++++++-----
 7 files changed, 73 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b9a6b686/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index b5f5a25..1902ab9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -371,19 +371,23 @@ public class SegmentFileStore {
       }
       Path path = new Path(location);
       // Update the status to delete if path equals
-      for (PartitionSpec spec : partitionSpecs) {
-        if (path.equals(spec.getLocation())) {
-          
entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage());
-          updateSegment = true;
-          break;
+      if (null != partitionSpecs) {
+        for (PartitionSpec spec : partitionSpecs) {
+          if (path.equals(spec.getLocation())) {
+            
entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage());
+            updateSegment = true;
+            break;
+          }
         }
       }
     }
-    String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
-    writePath =
-        writePath + CarbonCommonConstants.FILE_SEPARATOR + 
segment.getSegmentNo() + "_" + uniqueId
-            + CarbonTablePath.SEGMENT_EXT;
-    writeSegmentFile(segmentFile, writePath);
+    if (updateSegment) {
+      String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
+      writePath =
+          writePath + CarbonCommonConstants.FILE_SEPARATOR + 
segment.getSegmentNo() + "_" + uniqueId
+              + CarbonTablePath.SEGMENT_EXT;
+      writeSegmentFile(segmentFile, writePath);
+    }
     // Check whether we can completly remove the segment.
     boolean deleteSegment = true;
     for (Map.Entry<String, FolderDetails> entry : 
segmentFile.getLocationMap().entrySet()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b9a6b686/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 1e59a80..8b71a31 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -34,11 +34,13 @@ import 
org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, M
 class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
+    sql("drop database if exists otherDB cascade")
     sql("drop table if exists PreAggMain")
     sql("drop table if exists PreAggMain1")
     sql("drop table if exists PreAggMain2")
     sql("drop table if exists maintable")
     sql("drop table if exists showTables")
+    sql("drop table if exists Preagg_twodb")
     sql("create table preaggMain (a string, b string, c string) stored by 
'carbondata'")
     sql("create table preaggMain1 (a string, b string, c string) stored by 
'carbondata' tblProperties('DICTIONARY_INCLUDE' = 'a')")
     sql("create table preaggMain2 (a string, b string, c string) stored by 
'carbondata'")
@@ -377,13 +379,32 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
   }
 
-  test("test show tables filterted with datamaps"){
+  test("test show tables filterted with datamaps") {
     sql("create table showTables(name string, age int) stored by 'carbondata'")
     sql("create datamap preAgg on table showTables using 'preaggregate' as 
select sum(age) from showTables")
     sql("show tables").show()
     assert(!sql("show tables").collect().contains("showTables_preagg"))
   }
 
+  test("test create main and preagg table of same name in two database") {
+    sql("drop table if exists Preagg_twodb")
+    sql("create table Preagg_twodb(name string, age int) stored by 
'carbondata'")
+    sql("create datamap sameName on table Preagg_twodb using 'preaggregate' as 
select sum(age) from Preagg_twodb")
+    sql("create database otherDB")
+    sql("use otherDB")
+    sql("drop table if exists Preagg_twodb")
+    sql("create table Preagg_twodb(name string, age int) stored by 
'carbondata'")
+    try {
+      sql(
+        "create datamap sameName on table Preagg_twodb using 'preaggregate' as 
select sum(age) from Preagg_twodb")
+      assert(true)
+    } catch {
+      case ex: Exception =>
+        assert(false)
+    }
+    sql("use default")
+  }
+
   def getCarbontable(plan: LogicalPlan) : CarbonTable ={
     var carbonTable : CarbonTable = null
     plan.transform {
@@ -405,12 +426,14 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
     carbonTable
   }
 
-  override def afterAll {
+  override def afterAll { 
+    sql("drop database if exists otherDB cascade")
     sql("drop table if exists maintable")
     sql("drop table if exists PreAggMain")
     sql("drop table if exists PreAggMain1")
     sql("drop table if exists PreAggMain2")
     sql("drop table if exists maintabletime")
     sql("drop table if exists showTables")
+    sql("drop table if exists Preagg_twodb")
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b9a6b686/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
index 7d0959c..ff062cd 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
@@ -914,6 +914,17 @@ class StandardPartitionGlobalSortTestCase extends 
QueryTest with BeforeAndAfterA
     assert(sql("show segments for table comp_dt2").collect().length == 3)
     assert(sql("select * from comp_dt2").collect().length == 13)
     sql("clean files for table comp_dt2")
+    assert(sql("show segments for table comp_dt2").collect().length == 1)
+    assert(sql("select * from comp_dt2").collect().length == 13)
+  }
+
+  test("test insert into partition column which does not exists") {
+    sql("drop table if exists partitionNoColumn")
+    sql("create table partitionNoColumn (name string, dob date) partitioned 
by(year int,month int) stored by 'carbondata'")
+    val exMessage = intercept[Exception] {
+      sql("insert into partitionNoColumn partition(year=2014,month=01,day=01) 
select 'martin','2014-04-07'")
+    }
+    assert(exMessage.getMessage.contains("day is not a valid partition column 
in table default.partitionnocolumn"))
   }
 
   override def afterAll = {
@@ -976,5 +987,7 @@ class StandardPartitionGlobalSortTestCase extends QueryTest 
with BeforeAndAfterA
     sql("drop table if exists partitiondecimal")
     sql("drop table if exists partitiondecimalstatic")
     sql("drop table if exists partitiondatadelete")
+    sql("drop table if exists comp_dt2")
+    sql("drop table if exists partitionNoColumn")
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b9a6b686/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 1695a13..9985a3a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -544,6 +544,9 @@ object CarbonDataRDDFactory {
       }
       try {
         // compaction handling
+        if (carbonTable.isHivePartitionTable) {
+          carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
+        }
         handleSegmentMerging(sqlContext, carbonLoadModel, carbonTable, 
operationContext)
       } catch {
         case e: Exception =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b9a6b686/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 07acaa5..a4fa37a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -96,6 +96,9 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
           segList,
           compactionModel.compactionType
         )
+        if 
(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
+          carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
+        }
       }
       else {
         loadsToMerge.clear()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b9a6b686/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 0fd5437..0eb7ad5 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -53,7 +53,7 @@ case class CarbonCreateDataMapCommand(
       throw new MalformedCarbonCommandException("Streaming table does not 
support creating datamap")
     }
     val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val dbName = tableIdentifier.database.getOrElse("default")
+    val dbName = 
tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
     val tableName = tableIdentifier.table + "_" + dataMapName
     val newDmProperties = if 
(dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).isDefined) {
       dmProperties.updated(TimeSeriesUtil.TIMESERIES_EVENTTIME,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b9a6b686/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 7800d3e..71b8387 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -563,13 +563,20 @@ case class CarbonLoadDataCommand(
     var partitionsLen = 0
     val sortScope = 
CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope)
     val partitionValues = if (partition.nonEmpty) {
-      partition.filter(_._2.nonEmpty).map{ case(col, value) =>
-        val field = catalogTable.schema.find(_.name.equalsIgnoreCase(col)).get
-        CarbonScalaUtil.convertToDateAndTimeFormats(
-          value.get,
-          field.dataType,
-          timeStampFormat,
-          dateFormat)
+      partition.filter(_._2.nonEmpty).map { case (col, value) =>
+        catalogTable.schema.find(_.name.equalsIgnoreCase(col)) match {
+          case Some(c) =>
+            CarbonScalaUtil.convertToDateAndTimeFormats(
+              value.get,
+              c.dataType,
+              timeStampFormat,
+              dateFormat)
+          case None =>
+            throw new AnalysisException(s"$col is not a valid partition column 
in table ${
+              carbonLoadModel
+                .getDatabaseName
+            }.${ carbonLoadModel.getTableName }")
+        }
       }.toArray
     } else {
       Array[String]()

Reply via email to