Repository: incubator-carbondata Updated Branches: refs/heads/master 11df4513b -> 9efcacdac
Updating proper schema update time stamp Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/ebc5ee97 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/ebc5ee97 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/ebc5ee97 Branch: refs/heads/master Commit: ebc5ee973cf6e55194d9804a7a70b2011505339a Parents: 11df451 Author: Manohar <manohar.craz...@gmail.com> Authored: Wed Apr 12 16:22:26 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Apr 13 15:36:19 2017 +0530 ---------------------------------------------------------------------- .../MajorCompactionIgnoreInMinorTest.scala | 53 +++++++++++++++++--- .../spark/rdd/CarbonDataRDDFactory.scala | 11 ++-- .../store/CarbonFactDataHandlerModel.java | 1 + 3 files changed, 54 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ebc5ee97/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala index a879598..9d2cf96 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala @@ -35,6 +35,10 @@ import org.apache.carbondata.hadoop.CacheClient */ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll { + val csvFilePath1 = s"$resourcesPath/compaction/compaction1.csv" + val csvFilePath2 = s"$resourcesPath/compaction/compaction2.csv" + val csvFilePath3 = s"$resourcesPath/compaction/compaction3.csv" + override def beforeAll { CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2") sql("drop table if exists ignoremajor") @@ -46,12 +50,6 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll "phonetype String, serialname String, salary Int) STORED BY 'org.apache.carbondata" + ".format'" ) - - - val csvFilePath1 = s"$resourcesPath/compaction/compaction1.csv" - val csvFilePath2 = s"$resourcesPath/compaction/compaction2.csv" - val csvFilePath3 = s"$resourcesPath/compaction/compaction3.csv" - sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE ignoremajor OPTIONS" + "('DELIMITER'= ',', 'QUOTECHAR'= '\"')" ) @@ -149,8 +147,51 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll } + /** + * Execute two major compactions sequentially + */ + test("Execute two major compactions sequentially") { + sql("drop table if exists testmajor") + sql( + "CREATE TABLE IF NOT EXISTS testmajor (country String, ID Int, date Timestamp, name " + + "String, " + + "phonetype String, serialname String, salary Int) STORED BY 'org.apache.carbondata" + + ".format'" + ) + sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE testmajor OPTIONS" + + "('DELIMITER'= ',', 'QUOTECHAR'= '\"')" + ) + sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE testmajor OPTIONS" + + "('DELIMITER'= ',', 'QUOTECHAR'= '\"')" + ) + // compaction will happen here. + sql("alter table testmajor compact 'major'") + sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE testmajor OPTIONS" + + "('DELIMITER'= ',', 'QUOTECHAR'= '\"')" + ) + sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE testmajor OPTIONS" + + "('DELIMITER'= ',', 'QUOTECHAR'= '\"')" + ) + sql("alter table testmajor compact 'major'") + val identifier = new AbsoluteTableIdentifier( + CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), + new CarbonTableIdentifier( + CarbonCommonConstants.DATABASE_DEFAULT_NAME, "testmajor", "ttt") + ) + val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier) + + // merged segment should not be there + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList + assert(!segments.contains("0.1")) + assert(segments.contains("0.2")) + assert(!segments.contains("2")) + assert(!segments.contains("3")) + + } + override def afterAll { sql("drop table if exists ignoremajor") + sql("drop table if exists testmajor") CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ebc5ee97/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 4f33043..ca96a17 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -162,6 +162,7 @@ object CarbonDataRDDFactory { case e: Exception => LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }") lock.unlock() + throw e } } else { LOGGER.audit("Not able to acquire the compaction lock for table " + @@ -334,11 +335,11 @@ object CarbonDataRDDFactory { .tablesMeta.toArray, skipCompactionTables.asJava ) } - // giving the user his error for telling in the beeline if his triggered table - // compaction is failed. - if (!triggeredCompactionStatus) { - throw new Exception("Exception in compaction " + exception.getMessage) - } + } + // giving the user his error for telling in the beeline if his triggered table + // compaction is failed. + if (!triggeredCompactionStatus) { + throw new Exception("Exception in compaction " + exception.getMessage) } } finally { executor.shutdownNow() http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ebc5ee97/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index e64caea..15c75c1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -308,6 +308,7 @@ public class CarbonFactDataHandlerModel { CarbonTable carbonTable, SegmentProperties segmentProperties, String tableName, String tempStoreLocation) { CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel(); + carbonFactDataHandlerModel.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime()); carbonFactDataHandlerModel.setDatabaseName(loadModel.getDatabaseName()); carbonFactDataHandlerModel.setTableName(tableName); carbonFactDataHandlerModel.setMeasureCount(segmentProperties.getMeasures().size());