[CARBONDATA-1526] [PreAgg] Added support to compact segments in pre-agg table
This PR will add to compact the pre-aggregate tables. A pre-aggregate table can be compacted using the alter command i.e alter table table_name compact 'minor/major'. If a table with some pre-aggregate table is compacted, then all the pre-aggregate tables are also compacted with the parent table This closes #1605 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2304303c Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2304303c Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2304303c Branch: refs/heads/master Commit: 2304303ca4917b087159ae9888c8bddbb761b048 Parents: 6dcf4eb Author: kunal642 <[email protected]> Authored: Wed Nov 22 19:33:37 2017 +0530 Committer: ravipesala <[email protected]> Committed: Thu Dec 7 21:20:16 2017 +0530 ---------------------------------------------------------------------- .../TestPreAggregateCompaction.scala | 181 ++++++++++++++ .../spark/compaction/CompactionCallable.java | 44 ---- .../org/apache/carbondata/api/CarbonStore.scala | 6 +- .../carbondata/events/AlterTableEvents.scala | 64 ++--- .../org/apache/carbondata/events/Events.scala | 4 +- .../apache/carbondata/events/LoadEvents.scala | 8 + .../apache/carbondata/spark/rdd/Compactor.scala | 167 ------------- .../spark/rdd/DataManagementFunc.scala | 225 ------------------ .../carbondata/spark/util/DataLoadingUtil.scala | 75 +++++- .../spark/rdd/AggregateDataMapCompactor.scala | 118 +++++++++ .../spark/rdd/CarbonDataRDDFactory.scala | 34 +-- .../spark/rdd/CarbonTableCompactor.scala | 238 +++++++++++++++++++ .../spark/rdd/CompactionFactory.scala | 53 +++++ .../apache/carbondata/spark/rdd/Compactor.scala | 63 +++++ .../org/apache/spark/sql/CarbonSession.scala | 4 +- .../management/CarbonLoadDataCommand.scala | 6 +- .../preaaggregate/PreAggregateListeners.scala | 38 ++- .../preaaggregate/PreAggregateUtil.scala | 19 +- .../processing/util/CarbonLoaderUtil.java | 11 +- 19 files changed, 864 insertions(+), 494 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala new file mode 100644 index 0000000..89cf8eb --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.integration.spark.testsuite.preaggregate + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.Matchers._ + +class TestPreAggregateCompaction extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { + + val testData = s"$resourcesPath/sample.csv" + + override def beforeEach(): Unit = { + sql("drop database if exists compaction cascade") + sql("create database if not exists compaction") + sql("use compaction") + sql("create table testtable (id int, name string, city string, age int) STORED BY 'org.apache.carbondata.format'") + sql( + """ + | CREATE TABLE maintable(id int, name string, city string, age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql( + s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id""" + .stripMargin) + sql( + s"""create datamap preagg_avg on table maintable using 'preaggregate' as select id,avg(age) from maintable group by id""" + .stripMargin) + } + + test("test if pre-agg table is compacted with parent table minor compaction") { + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql("insert into testtable select * from maintable") + val sumResult = sql("select id, sum(age) from testtable group by id").collect() + val avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect() + sql("alter table maintable compact 'minor'") + val segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString) + segmentNamesSum should equal (Array("3", "2", "1", "0.1", "0")) + checkAnswer(sql("select * from maintable_preagg_sum"), sumResult) + val segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString) + segmentNamesAvg should equal (Array("3", "2", "1", "0.1", "0")) + checkAnswer(sql("select * from maintable_preagg_avg"), avgResult) + } + + test("test if pre-agg table is compacted with parent table major compaction") { + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql("alter table maintable compact 'major'") + sql("insert into testtable select * from maintable") + val sumResult = sql("select id, sum(age) from testtable group by id").collect() + val avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect() + sql("alter table maintable compact 'minor'") + val segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString) + segmentNamesSum should equal (Array("3", "2", "1", "0.1", "0")) + checkAnswer(sql("select * from maintable_preagg_sum"), sumResult) + val segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString) + segmentNamesAvg should equal (Array("3", "2", "1", "0.1", "0")) + checkAnswer(sql("select * from maintable_preagg_avg"), avgResult) + } + + test("test if 2nd level minor compaction is successful for pre-agg table") { + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql("alter table maintable compact 'minor'") + var segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString) + segmentNamesSum should equal (Array("3", "2", "1", "0.1", "0")) + sql("insert into testtable select * from maintable") + var sumResult = sql("select id, sum(age) from testtable group by id").collect() + var avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect() + checkAnswer(sql("select * from maintable_preagg_sum"), sumResult) + var segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString) + segmentNamesAvg should equal (Array("3", "2", "1", "0.1", "0")) + checkAnswer(sql("select * from maintable_preagg_avg"), avgResult) + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql("alter table maintable compact 'minor'") + sql("insert overwrite table testtable select * from maintable") + sumResult = sql("select id, sum(age) from testtable group by id").collect() + avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect() + segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString) + segmentNamesSum.sorted should equal (Array("0", "0.1", "1", "2", "3", "4", "4.1", "5", "6", "7")) + checkAnswer(sql("select maintable_id, sum(maintable_age_sum) from maintable_preagg_sum group by maintable_id"), sumResult) + segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString) + segmentNamesAvg.sorted should equal (Array("0", "0.1", "1", "2", "3", "4", "4.1", "5", "6", "7")) + checkAnswer(sql("select maintable_id, sum(maintable_age_sum), sum(maintable_age_count) from maintable_preagg_avg group by maintable_id"), avgResult) + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql("alter table maintable compact 'minor'") + sql("insert overwrite table testtable select * from maintable") + sumResult = sql("select id, sum(age) from testtable group by id").collect() + avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect() + segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString) + segmentNamesSum should equal (Array("11", "10", "9", "8.1", "8", "7", "6", "5", "4.1", "4", "3", "2", "1", "0.2", "0.1", "0")) + checkAnswer(sql("select maintable_id, sum(maintable_age_sum) from maintable_preagg_sum group by maintable_id"), sumResult) + segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString) + segmentNamesAvg should equal (Array("11", "10", "9", "8.1", "8", "7", "6", "5", "4.1", "4", "3", "2", "1", "0.2", "0.1", "0")) + checkAnswer(sql("select maintable_id, sum(maintable_age_sum), sum(maintable_age_count) from maintable_preagg_avg group by maintable_id"), avgResult) + } + + test("test direct minor compaction on pre-agg tables") { + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql("alter table maintable_preagg_sum compact 'minor'") + var segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString) + segmentNamesSum should equal (Array("3", "2", "1", "0.1", "0")) + sql("insert into testtable select * from maintable") + var sumResult = sql("select id, sum(age) from testtable group by id").collect() + checkAnswer(sql("select * from maintable_preagg_sum"), sumResult) + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql("alter table maintable_preagg_sum compact 'minor'") + sql("insert overwrite table testtable select * from maintable") + sumResult = sql("select id, sum(age) from testtable group by id").collect() + segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString) + segmentNamesSum.sorted should equal (Array("0", "0.1", "1", "2", "3", "4", "4.1", "5", "6", "7")) + checkAnswer(sql("select maintable_id, sum(maintable_age_sum) from maintable_preagg_sum group by maintable_id"), sumResult) + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql("alter table maintable_preagg_sum compact 'minor'") + sql("insert overwrite table testtable select * from maintable") + sumResult = sql("select id, sum(age) from testtable group by id").collect() + segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString) + segmentNamesSum should equal (Array("11", "10", "9", "8.1", "8", "7", "6", "5", "4.1", "4", "3", "2", "1", "0.2", "0.1", "0")) + checkAnswer(sql("select maintable_id, sum(maintable_age_sum) from maintable_preagg_sum group by maintable_id"), sumResult) + } + + test("test if minor/major compaction is successful for pre-agg table") { + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql("alter table maintable_preagg_sum compact 'minor'") + var segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString) + segmentNamesSum should equal (Array("3","2","1","0.1", "0")) + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql("alter table maintable_preagg_sum compact 'major'") + segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString) + segmentNamesSum.sorted should equal (Array("0", "0.1", "0.2", "1", "2", "3", "4", "5", "6", "7")) + } + + override def afterAll(): Unit = { + sql("drop database if exists compaction cascade") + sql("use default") + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/java/org/apache/carbondata/spark/compaction/CompactionCallable.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/compaction/CompactionCallable.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/compaction/CompactionCallable.java deleted file mode 100644 index 2773eef..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/compaction/CompactionCallable.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.compaction; - -import java.util.concurrent.Callable; - -import org.apache.carbondata.spark.rdd.Compactor; - -import org.apache.spark.sql.execution.command.CompactionCallableModel; - -/** - * Callable class which is used to trigger the compaction in a separate callable. - */ -public class CompactionCallable implements Callable<Void> { - - private final CompactionCallableModel compactionCallableModel; - - public CompactionCallable(CompactionCallableModel compactionCallableModel) { - - this.compactionCallableModel = compactionCallableModel; - } - - @Override public Void call() throws Exception { - - Compactor.triggerCompaction(compactionCallableModel); - return null; - - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index f4f569b..2b127e4 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -22,9 +22,7 @@ import java.lang.Long import scala.collection.JavaConverters._ import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.types.TimestampType import org.apache.spark.sql.util.CarbonException import org.apache.spark.unsafe.types.UTF8String @@ -37,7 +35,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.spark.exception.MalformedCarbonCommandException -import org.apache.carbondata.spark.rdd.DataManagementFunc +import org.apache.carbondata.spark.util.DataLoadingUtil object CarbonStore { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -127,7 +125,7 @@ object CarbonStore { FileFactory.getCarbonFile(absIdent.getTablePath, FileFactory.getFileType(absIdent.getTablePath))) } else { - DataManagementFunc.deleteLoadsAndUpdateMetadata( + DataLoadingUtil.deleteLoadsAndUpdateMetadata( isForceDeletion = true, carbonTable) CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala index 1a0c305..7caad43 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala @@ -18,7 +18,7 @@ package org.apache.carbondata.events import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel} +import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel, CarbonMergerMapping} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.processing.loading.model.CarbonLoadModel @@ -29,7 +29,8 @@ import org.apache.carbondata.processing.loading.model.CarbonLoadModel * @param alterTableDropColumnModel * @param sparkSession */ -case class AlterTableDropColumnPreEvent(carbonTable: CarbonTable, +case class AlterTableDropColumnPreEvent( + carbonTable: CarbonTable, alterTableDropColumnModel: AlterTableDropColumnModel, sparkSession: SparkSession) extends Event with AlterTableDropColumnEventInfo @@ -40,7 +41,9 @@ case class AlterTableDropColumnPreEvent(carbonTable: CarbonTable, * @param carbonTable * @param alterTableDataTypeChangeModel */ -case class AlterTableDataTypeChangePreEvent(sparkSession: SparkSession, carbonTable: CarbonTable, +case class AlterTableDataTypeChangePreEvent( + sparkSession: SparkSession, + carbonTable: CarbonTable, alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel) extends Event with AlterTableDataTypeChangeEventInfo @@ -50,7 +53,9 @@ case class AlterTableDataTypeChangePreEvent(sparkSession: SparkSession, carbonTa * @param carbonTable * @param alterTableDataTypeChangeModel */ -case class AlterTableDataTypeChangePostEvent(sparkSession: SparkSession, carbonTable: CarbonTable, +case class AlterTableDataTypeChangePostEvent( + sparkSession: SparkSession, + carbonTable: CarbonTable, alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel) extends Event with AlterTableDataTypeChangeEventInfo @@ -60,7 +65,8 @@ case class AlterTableDataTypeChangePostEvent(sparkSession: SparkSession, carbonT * @param alterTableDropColumnModel * @param sparkSession */ -case class AlterTableDropColumnPostEvent(carbonTable: CarbonTable, +case class AlterTableDropColumnPostEvent( + carbonTable: CarbonTable, alterTableDropColumnModel: AlterTableDropColumnModel, sparkSession: SparkSession) extends Event with AlterTableDropColumnEventInfo @@ -71,7 +77,8 @@ case class AlterTableDropColumnPostEvent(carbonTable: CarbonTable, * @param alterTableDropColumnModel * @param sparkSession */ -case class AlterTableDropColumnAbortEvent(carbonTable: CarbonTable, +case class AlterTableDropColumnAbortEvent( + carbonTable: CarbonTable, alterTableDropColumnModel: AlterTableDropColumnModel, sparkSession: SparkSession) extends Event with AlterTableDropColumnEventInfo @@ -83,7 +90,8 @@ case class AlterTableDropColumnAbortEvent(carbonTable: CarbonTable, * @param newTablePath * @param sparkSession */ -case class AlterTableRenamePreEvent(carbonTable: CarbonTable, +case class AlterTableRenamePreEvent( + carbonTable: CarbonTable, alterTableRenameModel: AlterTableRenameModel, newTablePath: String, sparkSession: SparkSession) extends Event with AlterTableRenameEventInfo @@ -92,7 +100,9 @@ case class AlterTableRenamePreEvent(carbonTable: CarbonTable, * @param carbonTable * @param alterTableAddColumnsModel */ -case class AlterTableAddColumnPreEvent(sparkSession: SparkSession, carbonTable: CarbonTable, +case class AlterTableAddColumnPreEvent( + sparkSession: SparkSession, + carbonTable: CarbonTable, alterTableAddColumnsModel: AlterTableAddColumnsModel) extends Event with AlterTableAddColumnEventInfo @@ -101,7 +111,9 @@ case class AlterTableAddColumnPreEvent(sparkSession: SparkSession, carbonTable: * @param carbonTable * @param alterTableAddColumnsModel */ -case class AlterTableAddColumnPostEvent(sparkSession: SparkSession, carbonTable: CarbonTable, +case class AlterTableAddColumnPostEvent( + sparkSession: SparkSession, + carbonTable: CarbonTable, alterTableAddColumnsModel: AlterTableAddColumnsModel) extends Event with AlterTableAddColumnEventInfo @@ -113,7 +125,8 @@ case class AlterTableAddColumnPostEvent(sparkSession: SparkSession, carbonTable: * @param newTablePath * @param sparkSession */ -case class AlterTableRenamePostEvent(carbonTable: CarbonTable, +case class AlterTableRenamePostEvent( + carbonTable: CarbonTable, alterTableRenameModel: AlterTableRenameModel, newTablePath: String, sparkSession: SparkSession) extends Event with AlterTableRenameEventInfo @@ -125,33 +138,29 @@ case class AlterTableRenamePostEvent(carbonTable: CarbonTable, * @param newTablePath * @param sparkSession */ -case class AlterTableRenameAbortEvent(carbonTable: CarbonTable, +case class AlterTableRenameAbortEvent( + carbonTable: CarbonTable, alterTableRenameModel: AlterTableRenameModel, newTablePath: String, sparkSession: SparkSession) extends Event with AlterTableRenameEventInfo -/** - * - * @param carbonTable - * @param carbonLoadModel - * @param mergedLoadName - * @param sQLContext - */ -case class AlterTableCompactionPreEvent(sparkSession: SparkSession, carbonTable: CarbonTable, - carbonLoadModel: CarbonLoadModel, +case class AlterTableCompactionPreEvent( + carbonTable: CarbonTable, + carbonMergerMapping: CarbonMergerMapping, mergedLoadName: String, - sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo + sqlContext: SQLContext) extends Event with AlterTableCompactionEventInfo /** * * @param carbonTable - * @param carbonLoadModel + * @param carbonMergerMapping * @param mergedLoadName * @param sQLContext */ -case class AlterTableCompactionPostEvent(sparkSession: SparkSession, carbonTable: CarbonTable, - carbonLoadModel: CarbonLoadModel, +case class AlterTableCompactionPostEvent( + carbonTable: CarbonTable, + carbonMergerMapping: CarbonMergerMapping, mergedLoadName: String, sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo @@ -160,11 +169,12 @@ case class AlterTableCompactionPostEvent(sparkSession: SparkSession, carbonTable * Class for handling clean up in case of any failure and abort the operation * * @param carbonTable - * @param carbonLoadModel + * @param carbonMergerMapping * @param mergedLoadName * @param sQLContext */ -case class AlterTableCompactionAbortEvent(carbonTable: CarbonTable, - carbonLoadModel: CarbonLoadModel, +case class AlterTableCompactionAbortEvent( + carbonTable: CarbonTable, + carbonMergerMapping: CarbonMergerMapping, mergedLoadName: String, sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala index 6279fca..4af337b 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala @@ -18,7 +18,7 @@ package org.apache.carbondata.events import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel} +import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel, CarbonMergerMapping} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.schema.table.CarbonTable @@ -95,7 +95,7 @@ trait AlterTableAddColumnEventInfo { */ trait AlterTableCompactionEventInfo { val carbonTable: CarbonTable - val carbonLoadModel: CarbonLoadModel + val carbonMergerMapping: CarbonMergerMapping val mergedLoadName: String } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala index 12f2922..84dde84 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala @@ -45,6 +45,14 @@ case class LoadTablePostExecutionEvent(sparkSession: SparkSession, carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo /** + * Class for handling operations after data load completion and before final commit of load + * operation. Example usage: For loading pre-aggregate tables + */ +case class LoadTablePreStatusUpdateEvent(sparkSession: SparkSession, + carbonTableIdentifier: CarbonTableIdentifier, + carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo + +/** * Class for handling clean up in case of any failure and abort the operation. */ case class LoadTableAbortExecutionEvent(sparkSession: SparkSession, http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala deleted file mode 100644 index e41211a..0000000 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.rdd - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel} - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.mutate.CarbonUpdateUtil -import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.events._ -import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} -import org.apache.carbondata.spark.MergeResultImpl -import org.apache.carbondata.spark.util.CommonUtil - -/** - * Compactor class which handled the compaction cases. - */ -object Compactor { - - val logger = LogServiceFactory.getLogService(Compactor.getClass.getName) - - def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = { - - val carbonTable = compactionCallableModel.carbonTable - val loadsToMerge = compactionCallableModel.loadsToMerge - val sc = compactionCallableModel.sqlContext - val carbonLoadModel = compactionCallableModel.carbonLoadModel - val compactionType = compactionCallableModel.compactionType - val storePath = carbonLoadModel.getTablePath - val startTime = System.nanoTime() - val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge) - var finalMergeStatus = false - val databaseName: String = carbonLoadModel.getDatabaseName - val factTableName = carbonLoadModel.getTableName - val validSegments: Array[String] = CarbonDataMergerUtil - .getValidSegments(loadsToMerge).split(',') - val mergeLoadStartTime = CarbonUpdateUtil.readCurrentTime() - val carbonMergerMapping = CarbonMergerMapping(storePath, - carbonTable.getMetaDataFilepath, - mergedLoadName, - databaseName, - factTableName, - validSegments, - carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId, - compactionType, - maxSegmentColCardinality = null, - maxSegmentColumnSchemaList = null - ) - carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation) - carbonLoadModel.setLoadMetadataDetails( - SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava) - // trigger event for compaction - val operationContext = new OperationContext - val alterTableCompactionPreEvent: AlterTableCompactionPreEvent = - AlterTableCompactionPreEvent(compactionCallableModel.sqlContext.sparkSession, - carbonTable, - carbonLoadModel, - mergedLoadName, - sc) - OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext) - - var execInstance = "1" - // in case of non dynamic executor allocation, number of executors are fixed. - if (sc.sparkContext.getConf.contains("spark.executor.instances")) { - execInstance = sc.sparkContext.getConf.get("spark.executor.instances") - logger.info(s"spark.executor.instances property is set to = $execInstance") - } // in case of dynamic executor allocation, taking the max executors of the dynamic allocation. - else if (sc.sparkContext.getConf.contains("spark.dynamicAllocation.enabled")) { - if (sc.sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim - .equalsIgnoreCase("true")) { - execInstance = sc.sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors") - logger.info(s"spark.dynamicAllocation.maxExecutors property is set to = $execInstance") - } - } - - val mergeStatus = - if (CompactionType.IUD_UPDDEL_DELTA == compactionType) { - new CarbonIUDMergerRDD( - sc.sparkContext, - new MergeResultImpl(), - carbonLoadModel, - carbonMergerMapping, - execInstance - ).collect - } else { - new CarbonMergerRDD( - sc.sparkContext, - new MergeResultImpl(), - carbonLoadModel, - carbonMergerMapping, - execInstance - ).collect - } - - if (mergeStatus.length == 0) { - finalMergeStatus = false - } else { - finalMergeStatus = mergeStatus.forall(_._2) - } - - if (finalMergeStatus) { - val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName) - CommonUtil.mergeIndexFiles( - sc.sparkContext, Seq(mergedLoadNumber), storePath, carbonTable, false) - - // trigger event for compaction - val alterTableCompactionPostEvent: AlterTableCompactionPostEvent = - AlterTableCompactionPostEvent(compactionCallableModel.sqlContext.sparkSession, - carbonTable, - carbonLoadModel, - mergedLoadName, - sc) - OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext) - - val endTime = System.nanoTime() - logger.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }") - val statusFileUpdation = - ((compactionType == CompactionType.IUD_UPDDEL_DELTA) && - CarbonDataMergerUtil - .updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge, - carbonTable.getMetaDataFilepath, - carbonLoadModel)) || - CarbonDataMergerUtil - .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath, - mergedLoadNumber, carbonLoadModel, mergeLoadStartTime, compactionType) - - if (!statusFileUpdation) { - logger.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." + - s"${ carbonLoadModel.getTableName }") - logger.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." + - s"${ carbonLoadModel.getTableName }") - throw new Exception(s"Compaction failed to update metadata for table" + - s" ${ carbonLoadModel.getDatabaseName }." + - s"${ carbonLoadModel.getTableName }") - } else { - logger.audit(s"Compaction request completed for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - logger.info(s"Compaction request completed for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - } - } else { - logger.audit(s"Compaction request failed for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" - ) - logger.error(s"Compaction request failed for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - throw new Exception("Compaction Failure in Merger Rdd.") - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala deleted file mode 100644 index 26a66f6..0000000 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.rdd - -import java.util -import java.util.concurrent._ - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel} - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} -import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} -import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders} -import org.apache.carbondata.spark.compaction.CompactionCallable -import org.apache.carbondata.spark.util.CommonUtil - -/** - * Common functions for data life cycle management - */ -object DataManagementFunc { - - private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - def executeCompaction(carbonLoadModel: CarbonLoadModel, - compactionModel: CompactionModel, - executor: ExecutorService, - sqlContext: SQLContext, - storeLocation: String): Unit = { - val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails]( - carbonLoadModel.getLoadMetadataDetails - ) - CarbonDataMergerUtil.sortSegments(sortedSegments) - - var segList = carbonLoadModel.getLoadMetadataDetails - var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged( - carbonLoadModel, - compactionModel.compactionSize, - segList, - compactionModel.compactionType - ) - while (loadsToMerge.size() > 1 || - (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType && - loadsToMerge.size() > 0)) { - val lastSegment = sortedSegments.get(sortedSegments.size() - 1) - deletePartialLoadsInCompaction(carbonLoadModel) - val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]]( - CarbonCommonConstants - .DEFAULT_COLLECTION_SIZE - ) - - scanSegmentsAndSubmitJob(futureList, - loadsToMerge, - executor, - sqlContext, - compactionModel, - carbonLoadModel - ) - - try { - - futureList.asScala.foreach(future => { - future.get - } - ) - } catch { - case e: Exception => - LOGGER.error(e, s"Exception in compaction thread ${ e.getMessage }") - throw e - } - - // scan again and determine if anything is there to merge again. - CommonUtil.readLoadMetadataDetails(carbonLoadModel) - segList = carbonLoadModel.getLoadMetadataDetails - // in case of major compaction we will scan only once and come out as it will keep - // on doing major for the new loads also. - // excluding the newly added segments. - if (CompactionType.MAJOR == compactionModel.compactionType) { - - segList = CarbonDataMergerUtil - .filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment) - } - - if (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType) { - loadsToMerge.clear() - } else if (segList.size > 0) { - loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged( - carbonLoadModel, - compactionModel.compactionSize, - segList, - compactionModel.compactionType - ) - } - else { - loadsToMerge.clear() - } - } - } - - /** - * This will submit the loads to be merged into the executor. - */ - private def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]], - loadsToMerge: util.List[LoadMetadataDetails], - executor: ExecutorService, - sqlContext: SQLContext, - compactionModel: CompactionModel, - carbonLoadModel: CarbonLoadModel - ): Unit = { - loadsToMerge.asScala.foreach { seg => - LOGGER.info("loads identified for merge is " + seg.getLoadName) - } - - val compactionCallableModel = CompactionCallableModel( - carbonLoadModel, - compactionModel.carbonTable, - loadsToMerge, - sqlContext, - compactionModel.compactionType - ) - - val future: Future[Void] = executor.submit(new CompactionCallable(compactionCallableModel)) - futureList.add(future) - } - - def deletePartialLoadsInCompaction(carbonLoadModel: CarbonLoadModel): Unit = { - // Deleting the any partially loaded data if present. - // in some case the segment folder which is present in store will not have entry in - // status. - // so deleting those folders. - try { - CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true) - } catch { - case e: Exception => - LOGGER.error(s"Exception in compaction thread while clean up of stale segments" + - s" ${ e.getMessage }") - } - } - - private def isLoadDeletionRequired(metaDataLocation: String): Boolean = { - val details = SegmentStatusManager.readLoadMetadata(metaDataLocation) - if (details != null && details.nonEmpty) for (oneRow <- details) { - if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus || - SegmentStatus.COMPACTED == oneRow.getSegmentStatus) && - oneRow.getVisibility.equalsIgnoreCase("true")) { - return true - } - } - false - } - - def deleteLoadsAndUpdateMetadata( - isForceDeletion: Boolean, - carbonTable: CarbonTable): Unit = { - if (isLoadDeletionRequired(carbonTable.getMetaDataFilepath)) { - val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath) - val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier - val carbonTableStatusLock = - CarbonLockFactory.getCarbonLockObj( - absoluteTableIdentifier, - LockUsage.TABLE_STATUS_LOCK - ) - - // Delete marked loads - val isUpdationRequired = - DeleteLoadFolders.deleteLoadFoldersFromFileSystem( - absoluteTableIdentifier, - isForceDeletion, - details - ) - - if (isUpdationRequired) { - try { - // Update load metadate file after cleaning deleted nodes - if (carbonTableStatusLock.lockWithRetries()) { - LOGGER.info("Table status lock has been successfully acquired.") - - // read latest table status again. - val latestMetadata = SegmentStatusManager - .readLoadMetadata(carbonTable.getMetaDataFilepath) - - // update the metadata details from old to new status. - val latestStatus = CarbonLoaderUtil - .updateLoadMetadataFromOldToNew(details, latestMetadata) - - CarbonLoaderUtil.writeLoadMetadata(absoluteTableIdentifier, latestStatus) - } else { - val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName - val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName - val errorMsg = "Clean files request is failed for " + - s"$dbName.$tableName" + - ". Not able to acquire the table status lock due to other operation " + - "running in the background." - LOGGER.audit(errorMsg) - LOGGER.error(errorMsg) - throw new Exception(errorMsg + " Please try after some time.") - } - } finally { - CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK) - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala index 74ed6a6..69c9fe4 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala @@ -26,13 +26,15 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.util.CarbonException import org.apache.carbondata.common.constants.LoggerAction -import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} -import org.apache.carbondata.processing.util.TableOptionConstant +import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders, TableOptionConstant} import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.load.ValidateUtil @@ -41,6 +43,8 @@ import org.apache.carbondata.spark.load.ValidateUtil */ object DataLoadingUtil { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + /** * get data loading options and initialise default value */ @@ -54,7 +58,6 @@ object DataLoadingUtil { optionsFinal.put("escapechar", options.getOrElse("escapechar", "\\")) optionsFinal.put("commentchar", options.getOrElse("commentchar", "#")) optionsFinal.put("columndict", options.getOrElse("columndict", null)) - optionsFinal.put( "serialization_null_format", options.getOrElse("serialization_null_format", "\\N")) @@ -321,4 +324,70 @@ object DataLoadingUtil { CommonUtil.readLoadMetadataDetails(carbonLoadModel) } } + + private def isLoadDeletionRequired(metaDataLocation: String): Boolean = { + val details = SegmentStatusManager.readLoadMetadata(metaDataLocation) + if (details != null && details.nonEmpty) for (oneRow <- details) { + if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus || + SegmentStatus.COMPACTED == oneRow.getSegmentStatus) && + oneRow.getVisibility.equalsIgnoreCase("true")) { + return true + } + } + false + } + + def deleteLoadsAndUpdateMetadata( + isForceDeletion: Boolean, + carbonTable: CarbonTable): Unit = { + if (isLoadDeletionRequired(carbonTable.getMetaDataFilepath)) { + val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath) + val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier + val carbonTableStatusLock = + CarbonLockFactory.getCarbonLockObj( + absoluteTableIdentifier, + LockUsage.TABLE_STATUS_LOCK + ) + + // Delete marked loads + val isUpdationRequired = + DeleteLoadFolders.deleteLoadFoldersFromFileSystem( + absoluteTableIdentifier, + isForceDeletion, + details + ) + + if (isUpdationRequired) { + try { + // Update load metadate file after cleaning deleted nodes + if (carbonTableStatusLock.lockWithRetries()) { + LOGGER.info("Table status lock has been successfully acquired.") + + // read latest table status again. + val latestMetadata = SegmentStatusManager + .readLoadMetadata(carbonTable.getMetaDataFilepath) + + // update the metadata details from old to new status. + val latestStatus = CarbonLoaderUtil + .updateLoadMetadataFromOldToNew(details, latestMetadata) + + CarbonLoaderUtil.writeLoadMetadata(absoluteTableIdentifier, latestStatus) + } else { + val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName + val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName + val errorMsg = "Clean files request is failed for " + + s"$dbName.$tableName" + + ". Not able to acquire the table status lock due to other operation " + + "running in the background." + LOGGER.audit(errorMsg) + LOGGER.error(errorMsg) + throw new Exception(errorMsg + " Please try after some time.") + } + } finally { + CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK) + } + } + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala new file mode 100644 index 0000000..636d731 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.rdd + +import java.util.concurrent.ExecutorService + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonSession, SQLContext} +import org.apache.spark.sql.execution.command.CompactionModel +import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand +import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil +import org.apache.spark.sql.parser.CarbonSpark2SqlParser + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} + +/** + * Used to perform compaction on Aggregate data map. + */ +class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, + compactionModel: CompactionModel, + executor: ExecutorService, + sqlContext: SQLContext, + storeLocation: String) + extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) { + + override def executeCompaction(): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val loadMetaDataDetails = identifySegmentsToBeMerged() + val segments = loadMetaDataDetails.asScala.map(_.getLoadName) + if (segments.nonEmpty) { + val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadMetaDataDetails).split("_")(1) + CarbonSession.threadSet( + CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName, + segments.mkString(",")) + CarbonSession.threadSet( + CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName, "false") + val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala + .map(_.getColumnName).mkString(",") + // Creating a new query string to insert data into pre-aggregate table from that same table. + // For example: To compact preaggtable1 we can fire a query like insert into preaggtable1 + // select * from preaggtable1 + // The following code will generate the select query with a load UDF that will be used to + // apply DataLoadingRules + val childDataFrame = sqlContext.sparkSession.sql(new CarbonSpark2SqlParser() + // adding the aggregation load UDF + .addPreAggLoadFunction( + // creating the select query on the bases on table schema + PreAggregateUtil.createChildSelectQuery( + carbonTable.getTableInfo.getFactTable, carbonTable.getDatabaseName))).drop("preAggLoad") + try { + CarbonLoadDataCommand( + Some(carbonTable.getDatabaseName), + carbonTable.getTableName, + null, + Nil, + Map("fileheader" -> headers), + isOverwriteTable = false, + dataFrame = Some(childDataFrame), + internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true", + "mergedSegmentName" -> mergedLoadName)).run(sqlContext.sparkSession) + val newLoadMetaDataDetails = SegmentStatusManager.readLoadMetadata( + carbonTable.getMetaDataFilepath) + val updatedLoadMetaDataDetails = newLoadMetaDataDetails collect { + case load if loadMetaDataDetails.contains(load) => + load.setMergedLoadName(mergedLoadName) + load.setSegmentStatus(SegmentStatus.COMPACTED) + load.setModificationOrdeletionTimesStamp(System.currentTimeMillis()) + load + case other => other + } + val carbonTablePath = CarbonStorePath + .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + .getAbsoluteTableIdentifier) + SegmentStatusManager + .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath, + updatedLoadMetaDataDetails) + carbonLoadModel.setLoadMetadataDetails(updatedLoadMetaDataDetails.toList.asJava) + } finally { + // check if any other segments needs compaction on in case of MINOR_COMPACTION. + // For example: after 8.1 creation 0.1, 4.1, 8.1 have to be merged to 0.2 if threshhold + // allows it. + if (!compactionModel.compactionType.equals(CompactionType.MAJOR)) { + executeCompaction() + } + CarbonSession + .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName) + CarbonSession.threadUnset(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName) + } + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 6393289..1d2934f 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -55,7 +55,7 @@ import org.apache.carbondata.core.scan.partition.PartitionUtil import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.CarbonStorePath -import org.apache.carbondata.events.{LoadTablePostExecutionEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreStatusUpdateEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable} @@ -66,7 +66,7 @@ import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonData import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _} import org.apache.carbondata.spark.load._ -import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util} +import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, Util} /** * This is the factory class which can create different RDD depends on user needs. @@ -161,18 +161,18 @@ object CarbonDataRDDFactory { val compactionThread = new Thread { override def run(): Unit = { + val compactor = CompactionFactory.getCompactor( + carbonLoadModel, + compactionModel, + executor, + sqlContext, + storeLocation) try { // compaction status of the table which is triggered by the user. var triggeredCompactionStatus = false var exception: Exception = null try { - DataManagementFunc.executeCompaction( - carbonLoadModel, - compactionModel, - executor, - sqlContext, - storeLocation - ) + compactor.executeCompaction() triggeredCompactionStatus = true } catch { case e: Exception => @@ -211,10 +211,12 @@ object CarbonDataRDDFactory { ) // proceed for compaction try { - DataManagementFunc.executeCompaction(newCarbonLoadModel, + CompactionFactory.getCompactor( + newCarbonLoadModel, newcompactionModel, - executor, sqlContext, storeLocation - ) + executor, + sqlContext, + storeLocation).executeCompaction() } catch { case e: Exception => LOGGER.error("Exception in compaction thread for table " + @@ -248,7 +250,7 @@ object CarbonDataRDDFactory { } } finally { executor.shutdownNow() - DataManagementFunc.deletePartialLoadsInCompaction(carbonLoadModel) + compactor.deletePartialLoadsInCompaction() compactionLock.unlock() } } @@ -290,7 +292,7 @@ object CarbonDataRDDFactory { s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") // Check if any load need to be deleted before loading new data val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - DataManagementFunc.deleteLoadsAndUpdateMetadata(isForceDeletion = false, carbonTable) + DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, carbonTable) var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null @@ -492,6 +494,10 @@ object CarbonDataRDDFactory { throw new Exception("No Data to load") } writeDictionary(carbonLoadModel, result, writeAll = false) + val loadTablePreStatusUpdateEvent = LoadTablePreStatusUpdateEvent(sqlContext.sparkSession, + carbonTable.getCarbonTableIdentifier, + carbonLoadModel) + OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent) val done = updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable) if (!done) { CommonUtil.updateTableStatusForFailure(carbonLoadModel) http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala new file mode 100644 index 0000000..3ebc957 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.util +import java.util.concurrent.ExecutorService + +import scala.collection.JavaConverters._ +import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future} + +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel} + +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} +import org.apache.carbondata.spark.MergeResultImpl +import org.apache.carbondata.spark.util.CommonUtil + +/** + * This class is used to perform compaction on carbon table. + */ +class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, + compactionModel: CompactionModel, + executor: ExecutorService, + sqlContext: SQLContext, + storeLocation: String) + extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) { + + override def executeCompaction(): Unit = { + val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails]( + carbonLoadModel.getLoadMetadataDetails + ) + CarbonDataMergerUtil.sortSegments(sortedSegments) + + var segList = carbonLoadModel.getLoadMetadataDetails + var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged( + carbonLoadModel, + compactionModel.compactionSize, + segList, + compactionModel.compactionType + ) + while (loadsToMerge.size() > 1 || + (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType && + loadsToMerge.size() > 0)) { + val lastSegment = sortedSegments.get(sortedSegments.size() - 1) + deletePartialLoadsInCompaction() + + try { + scanSegmentsAndSubmitJob(loadsToMerge) + } catch { + case e: Exception => + LOGGER.error(e, s"Exception in compaction thread ${ e.getMessage }") + throw e + } + + // scan again and determine if anything is there to merge again. + CommonUtil.readLoadMetadataDetails(carbonLoadModel) + segList = carbonLoadModel.getLoadMetadataDetails + // in case of major compaction we will scan only once and come out as it will keep + // on doing major for the new loads also. + // excluding the newly added segments. + if (CompactionType.MAJOR == compactionModel.compactionType) { + + segList = CarbonDataMergerUtil + .filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment) + } + + if (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType) { + loadsToMerge.clear() + } else if (segList.size > 0) { + loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged( + carbonLoadModel, + compactionModel.compactionSize, + segList, + compactionModel.compactionType + ) + } + else { + loadsToMerge.clear() + } + } + } + + /** + * This will submit the loads to be merged into the executor. + */ + def scanSegmentsAndSubmitJob(loadsToMerge: util.List[LoadMetadataDetails]): Unit = { + loadsToMerge.asScala.foreach { seg => + LOGGER.info("loads identified for merge is " + seg.getLoadName) + } + val compactionCallableModel = CompactionCallableModel( + carbonLoadModel, + compactionModel.carbonTable, + loadsToMerge, + sqlContext, + compactionModel.compactionType) + triggerCompaction(compactionCallableModel) + } + + private def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = { + val carbonTable = compactionCallableModel.carbonTable + val loadsToMerge = compactionCallableModel.loadsToMerge + val sc = compactionCallableModel.sqlContext + val carbonLoadModel = compactionCallableModel.carbonLoadModel + val compactionType = compactionCallableModel.compactionType + val tablePath = carbonLoadModel.getTablePath + val startTime = System.nanoTime() + val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge) + var finalMergeStatus = false + val databaseName: String = carbonLoadModel.getDatabaseName + val factTableName = carbonLoadModel.getTableName + val validSegments: Array[String] = CarbonDataMergerUtil + .getValidSegments(loadsToMerge).split(',') + val mergeLoadStartTime = CarbonUpdateUtil.readCurrentTime() + val carbonMergerMapping = CarbonMergerMapping(tablePath, + carbonTable.getMetaDataFilepath, + mergedLoadName, + databaseName, + factTableName, + validSegments, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId, + compactionType, + maxSegmentColCardinality = null, + maxSegmentColumnSchemaList = null + ) + carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation) + carbonLoadModel.setLoadMetadataDetails( + SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava) + // trigger event for compaction + val operationContext = new OperationContext + val alterTableCompactionPreEvent: AlterTableCompactionPreEvent = + AlterTableCompactionPreEvent(carbonTable, carbonMergerMapping, mergedLoadName, sc) + OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext) + + var execInstance = "1" + // in case of non dynamic executor allocation, number of executors are fixed. + if (sc.sparkContext.getConf.contains("spark.executor.instances")) { + execInstance = sc.sparkContext.getConf.get("spark.executor.instances") + LOGGER.info(s"spark.executor.instances property is set to = $execInstance") + } // in case of dynamic executor allocation, taking the max executors of the dynamic allocation. + else if (sc.sparkContext.getConf.contains("spark.dynamicAllocation.enabled")) { + if (sc.sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim + .equalsIgnoreCase("true")) { + execInstance = sc.sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors") + LOGGER.info(s"spark.dynamicAllocation.maxExecutors property is set to = $execInstance") + } + } + + val mergeStatus = + if (CompactionType.IUD_UPDDEL_DELTA == compactionType) { + new CarbonIUDMergerRDD( + sc.sparkContext, + new MergeResultImpl(), + carbonLoadModel, + carbonMergerMapping, + execInstance + ).collect + } else { + new CarbonMergerRDD( + sc.sparkContext, + new MergeResultImpl(), + carbonLoadModel, + carbonMergerMapping, + execInstance + ).collect + } + + if (mergeStatus.length == 0) { + finalMergeStatus = false + } else { + finalMergeStatus = mergeStatus.forall(_._2) + } + + if (finalMergeStatus) { + val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName) + CommonUtil.mergeIndexFiles( + sc.sparkContext, Seq(mergedLoadNumber), tablePath, carbonTable, false) + + // trigger event for compaction + val alterTableCompactionPostEvent: AlterTableCompactionPostEvent = + AlterTableCompactionPostEvent(carbonTable, carbonMergerMapping, mergedLoadName, sc) + OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext) + + val endTime = System.nanoTime() + LOGGER.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }") + val statusFileUpdation = + ((compactionType == CompactionType.IUD_UPDDEL_DELTA) && + CarbonDataMergerUtil + .updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge, + carbonTable.getMetaDataFilepath, + carbonLoadModel)) || + CarbonDataMergerUtil + .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath, + mergedLoadNumber, carbonLoadModel, mergeLoadStartTime, compactionType) + + if (!statusFileUpdation) { + LOGGER.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." + + s"${ carbonLoadModel.getTableName }") + LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." + + s"${ carbonLoadModel.getTableName }") + throw new Exception(s"Compaction failed to update metadata for table" + + s" ${ carbonLoadModel.getDatabaseName }." + + s"${ carbonLoadModel.getTableName }") + } else { + LOGGER.audit(s"Compaction request completed for table " + + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") + LOGGER.info(s"Compaction request completed for table " + + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") + } + } else { + LOGGER.audit(s"Compaction request failed for table " + + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" + ) + LOGGER.error(s"Compaction request failed for table " + + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") + throw new Exception("Compaction Failure in Merger Rdd.") + } + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala new file mode 100644 index 0000000..6060f06 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.util.concurrent.ExecutorService + +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.execution.command.CompactionModel + +import org.apache.carbondata.processing.loading.model.CarbonLoadModel + +object CompactionFactory { + + /** + * Returns appropriate Compactable object. + */ + def getCompactor(carbonLoadModel: CarbonLoadModel, + compactionModel: CompactionModel, + executor: ExecutorService, + sqlContext: SQLContext, + storeLocation: String): Compactor = { + if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) { + new AggregateDataMapCompactor( + carbonLoadModel, + compactionModel, + executor, + sqlContext, + storeLocation) + } else { + new CarbonTableCompactor( + carbonLoadModel, + compactionModel, + executor, + sqlContext, + storeLocation) + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala new file mode 100644 index 0000000..6fafc95 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.util.concurrent.ExecutorService + +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.execution.command.CompactionModel + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil +import org.apache.carbondata.processing.util.CarbonLoaderUtil + +abstract class Compactor(carbonLoadModel: CarbonLoadModel, + compactionModel: CompactionModel, + executor: ExecutorService, + sqlContext: SQLContext, + storeLocation: String) { + + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + def executeCompaction(): Unit + + def identifySegmentsToBeMerged(): java.util.List[LoadMetadataDetails] = { + CarbonDataMergerUtil + .identifySegmentsToBeMerged(carbonLoadModel, + compactionModel.compactionSize, + carbonLoadModel.getLoadMetadataDetails, + compactionModel.compactionType) + } + + def deletePartialLoadsInCompaction(): Unit = { + // Deleting the any partially loaded data if present. + // in some case the segment folder which is present in store will not have entry in + // status. + // so deleting those folders. + try { + CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true) + } catch { + case e: Exception => + LOGGER.error(s"Exception in compaction thread while clean up of stale segments" + + s" ${ e.getMessage }") + } + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index 0cb6ca6..a9b5455 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -259,7 +259,7 @@ object CarbonSession { def initListeners(): Unit = { OperationListenerBus.getInstance() .addListener(classOf[DropTablePostEvent], DataMapDropTablePostListener) - .addListener(classOf[LoadTablePostExecutionEvent], LoadPostAggregateListener) + .addListener(classOf[LoadTablePreStatusUpdateEvent], LoadPostAggregateListener) .addListener(classOf[DeleteSegmentByIdPreEvent], PreAggregateDeleteSegmentByIdPreListener) .addListener(classOf[DeleteSegmentByDatePreEvent], PreAggregateDeleteSegmentByDatePreListener) .addListener(classOf[UpdateTablePreEvent], UpdatePreAggregatePreListener) @@ -271,5 +271,7 @@ object CarbonSession { .addListener(classOf[AlterTableAddColumnPreEvent], PreAggregateAddColumnsPreListener) .addListener(classOf[DropDataMapPostEvent], DropDataMapPostListener) .addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener) + .addListener(classOf[AlterTableCompactionPostEvent], + AlterPreAggregateTableCompactionPostListener) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index e761bea..f642785 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -45,6 +45,7 @@ import org.apache.carbondata.format import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.exception.NoRetryException import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} +import org.apache.carbondata.processing.merger.CompactionType import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel} import org.apache.carbondata.spark.util.{CommonUtil, DataLoadingUtil, GlobalDictionaryUtil} @@ -128,8 +129,9 @@ case class CarbonLoadDataCommand( CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser), hadoopConf) } carbonLoadModel.setFactFilePath(factPath) - carbonLoadModel.setAggLoadRequest(internalOptions - .getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean) + carbonLoadModel.setAggLoadRequest( + internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean) + carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", "")) DataLoadingUtil.buildCarbonLoadModel( table, carbonProperty, http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala index 90b728d..9168247 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala @@ -19,14 +19,18 @@ package org.apache.spark.sql.execution.command.preaaggregate import scala.collection.JavaConverters._ -import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand +import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonLoadDataCommand} +import org.apache.spark.sql.execution.command.AlterTableModel import org.apache.spark.sql.CarbonSession import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.DataMapSchema +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.model.CarbonLoadModel object LoadPostAggregateListener extends OperationEventListener { /** @@ -35,7 +39,7 @@ object LoadPostAggregateListener extends OperationEventListener { * @param event */ override def onEvent(event: Event, operationContext: OperationContext): Unit = { - val loadEvent = event.asInstanceOf[LoadTablePostExecutionEvent] + val loadEvent = event.asInstanceOf[LoadTablePreStatusUpdateEvent] val sparkSession = loadEvent.sparkSession val carbonLoadModel = loadEvent.carbonLoadModel val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable @@ -55,6 +59,36 @@ object LoadPostAggregateListener extends OperationEventListener { } } +/** + * Listener to handle the operations that have to be done after compaction for a table has finished. + */ +object AlterPreAggregateTableCompactionPostListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val compactionEvent = event.asInstanceOf[AlterTableCompactionPostEvent] + val carbonTable = compactionEvent.carbonTable + val compactionType = compactionEvent.carbonMergerMapping.campactionType + val sparkSession = compactionEvent.sQLContext.sparkSession + if (carbonTable.hasDataMapSchema) { + carbonTable.getTableInfo.getDataMapSchemaList.asScala.foreach { dataMapSchema => + val childRelationIdentifier = dataMapSchema.getRelationIdentifier + val alterTableModel = AlterTableModel(Some(childRelationIdentifier.getDatabaseName), + childRelationIdentifier.getTableName, + None, + compactionType.toString, + Some(System.currentTimeMillis()), + "") + CarbonAlterTableCompactionCommand(alterTableModel).run(sparkSession) + } + } + } +} + object LoadPreAggregateTablePreListener extends OperationEventListener { /** * Called on a specified event occurrence
