http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala index 67effda..cccfb3f 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala @@ -17,6 +17,7 @@ package org.apache.carbondata.spark.testsuite.datamap +import java.io.File import java.util import scala.collection.JavaConverters._ @@ -26,14 +27,12 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datamap.dev.{DataMapRefresher, DataMapWriter} import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory} +import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapWriter} import org.apache.carbondata.core.datamap.status.{DataMapStatus, DataMapStatusManager} import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment} import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.features.TableOperation -import org.apache.carbondata.core.datastore.row.CarbonRow -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.util.CarbonProperties @@ -44,10 +43,11 @@ class TestDataMapStatus extends QueryTest with BeforeAndAfterAll { val testData = s"$resourcesPath/sample.csv" override def beforeAll: Unit = { + new File(CarbonProperties.getInstance().getSystemFolderLocation).delete() drop } - test("datamap status disable for new datamap") { + test("datamap status enable for new datamap") { sql("DROP TABLE IF EXISTS datamapstatustest") sql( """ @@ -64,11 +64,33 @@ class TestDataMapStatus extends QueryTest with BeforeAndAfterAll { assert(details.length == 1) + assert(details.exists(p => p.getDataMapName.equals("statusdatamap") && p.getStatus == DataMapStatus.ENABLED)) + sql("DROP TABLE IF EXISTS datamapstatustest") + } + + test("datamap status disable for new datamap with deferred rebuild") { + sql("DROP TABLE IF EXISTS datamapstatustest") + sql( + """ + | CREATE TABLE datamapstatustest(id int, name string, city string, age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql( + s"""create datamap statusdatamap on table datamapstatustest + |using '${classOf[TestDataMapFactory].getName}' + |with deferred rebuild + |dmproperties('index_columns'='name') + | """.stripMargin) + + val details = DataMapStatusManager.readDataMapStatusDetails() + + assert(details.length == 1) + assert(details.exists(p => p.getDataMapName.equals("statusdatamap") && p.getStatus == DataMapStatus.DISABLED)) sql("DROP TABLE IF EXISTS datamapstatustest") } - test("datamap status disable after new load") { + test("datamap status disable after new load with deferred rebuild") { sql("DROP TABLE IF EXISTS datamapstatustest1") sql( """ @@ -78,6 +100,7 @@ class TestDataMapStatus extends QueryTest with BeforeAndAfterAll { sql( s"""create datamap statusdatamap1 on table datamapstatustest1 |using '${classOf[TestDataMapFactory].getName}' + |with deferred rebuild |dmproperties('index_columns'='name') | """.stripMargin) @@ -94,8 +117,7 @@ class TestDataMapStatus extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS datamapstatustest1") } - // enable it in PR2255 - ignore("datamap status with refresh datamap") { + test("datamap status with REBUILD DATAMAP") { sql("DROP TABLE IF EXISTS datamapstatustest2") sql( """ @@ -105,6 +127,7 @@ class TestDataMapStatus extends QueryTest with BeforeAndAfterAll { sql( s"""create datamap statusdatamap2 on table datamapstatustest2 |using '${classOf[TestDataMapFactory].getName}' + |with deferred rebuild |dmproperties('index_columns'='name') | """.stripMargin) @@ -119,7 +142,7 @@ class TestDataMapStatus extends QueryTest with BeforeAndAfterAll { assert(details.length == 1) assert(details.exists(p => p.getDataMapName.equals("statusdatamap2") && p.getStatus == DataMapStatus.DISABLED)) - sql(s"refresh datamap statusdatamap2 on table datamapstatustest2") + sql(s"REBUILD DATAMAP statusdatamap2 on table datamapstatustest2") details = DataMapStatusManager.readDataMapStatusDetails() assert(details.length == 1) @@ -128,8 +151,7 @@ class TestDataMapStatus extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS datamapstatustest2") } - // enable it in PR2255 - ignore("datamap create without on table test") { + test("datamap create without on table test") { sql("DROP TABLE IF EXISTS datamapstatustest3") sql( """ @@ -144,10 +166,20 @@ class TestDataMapStatus extends QueryTest with BeforeAndAfterAll { | """.stripMargin) } + sql("DROP TABLE IF EXISTS datamapstatustest3") + } + test("rebuild datamap status") { + sql("DROP TABLE IF EXISTS datamapstatustest3") + sql( + """ + | CREATE TABLE datamapstatustest3(id int, name string, city string, age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) sql( s"""create datamap statusdatamap3 on table datamapstatustest3 |using '${classOf[TestDataMapFactory].getName}' + |with deferred rebuild |dmproperties('index_columns'='name') | """.stripMargin) @@ -162,7 +194,7 @@ class TestDataMapStatus extends QueryTest with BeforeAndAfterAll { assert(details.length == 1) assert(details.exists(p => p.getDataMapName.equals("statusdatamap3") && p.getStatus == DataMapStatus.DISABLED)) - sql(s"refresh datamap statusdatamap3") + sql(s"REBUILD DATAMAP statusdatamap3") details = DataMapStatusManager.readDataMapStatusDetails() assert(details.length == 1) @@ -245,8 +277,19 @@ class TestDataMapFactory( false } - override def createRefresher(segment: Segment, - shardName: String): DataMapRefresher = { - ??? + override def createBuilder(segment: Segment, + shardName: String): DataMapBuilder = { + return new DataMapBuilder { + override def initialize(): Unit = { } + + override def addRow(blockletId: Int, + pageId: Int, + rowId: Int, + values: Array[AnyRef]): Unit = { } + + override def finish(): Unit = { } + + override def close(): Unit = { } + } } }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala index c193fcf..2f3488e 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala @@ -28,8 +28,6 @@ import org.apache.carbondata.spark.util.DataGenerator * Test Suite for search mode */ -// TODO: Need to Fix -@Ignore class SearchModeTestCase extends QueryTest with BeforeAndAfterAll { val numRows = 500 * 1000 http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala index 9c5297d..3cabc7b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala @@ -24,11 +24,12 @@ import java.util.concurrent.{Callable, ExecutorService, Executors, Future} import scala.collection.JavaConverters._ import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SaveMode} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datamap.dev.{DataMapRefresher, DataMapWriter} +import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapWriter} import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory} import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment} import org.apache.carbondata.core.datastore.page.ColumnPage @@ -43,18 +44,30 @@ import org.apache.carbondata.events.Event // This testsuite test insert and insert overwrite with other commands concurrently class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach { private val executorService: ExecutorService = Executors.newFixedThreadPool(10) - var df: DataFrame = _ + var testData: DataFrame = _ override def beforeAll { dropTable() buildTestData() + createTable("orders", testData.schema) + createTable("orders_overwrite", testData.schema) sql( s""" | create datamap test on table orders | using '${classOf[WaitingDataMapFactory].getName}' | dmproperties('index_columns'='o_name') """.stripMargin) + + testData.write + .format("carbondata") + .option("tableName", "temp_table") + .option("tempCSV", "false") + .mode(SaveMode.Overwrite) + .save() + + sql(s"insert into orders select * from temp_table") + sql(s"insert into orders_overwrite select * from temp_table") } private def buildTestData(): Unit = { @@ -66,23 +79,17 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA import sqlContext.implicits._ val sdf = new SimpleDateFormat("yyyy-MM-dd") - df = sqlContext.sparkSession.sparkContext.parallelize(1 to 150000) + testData = sqlContext.sparkSession.sparkContext.parallelize(1 to 150000) .map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 10 + 10)).getTime), "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + value), 14999 + value, "ordersTable" + value)) .toDF("o_id", "o_date", "o_country", "o_name", "o_phonetype", "o_serialname", "o_salary", "o_comment") - createTable("orders") - createTable("orders_overwrite") } - private def createTable(tableName: String): Unit = { - df.write - .format("carbondata") - .option("tableName", tableName) - .option("tempCSV", "false") - .mode(SaveMode.Overwrite) - .save() + private def createTable(tableName: String, schema: StructType): Unit = { + val schemaString = schema.fields.map(x => x.name + " " + x.dataType.typeName).mkString(", ") + sql(s"CREATE TABLE $tableName ($schemaString) stored as carbondata") } override def afterAll { @@ -91,7 +98,7 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA } override def beforeEach(): Unit = { - Global.overwriteRunning = false + Global.loading = false } private def dropTable() = { @@ -101,12 +108,12 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA // run the input SQL and block until it is running private def runSqlAsync(sql: String): Future[String] = { - assert(!Global.overwriteRunning) + assert(!Global.loading) var count = 0 val future = executorService.submit( new QueryTask(sql) ) - while (!Global.overwriteRunning && count < 1000) { + while (!Global.loading && count < 1000) { Thread.sleep(10) // to avoid dead loop in case WaitingDataMapFactory is not invoked count += 1 @@ -202,9 +209,7 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA sql("drop table if exists t1") // number of segment is 1 after createTable - createTable("t1") - // number of segment is 2 after insert - sql("insert into table t1 select * from orders_overwrite") + createTable("t1", testData.schema) sql( s""" @@ -212,6 +217,12 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA | using '${classOf[WaitingDataMapFactory].getName}' | dmproperties('index_columns'='o_name') """.stripMargin) + + sql("insert into table t1 select * from orders_overwrite") + Thread.sleep(1100) + sql("insert into table t1 select * from orders_overwrite") + Thread.sleep(1100) + val future = runSqlAsync("insert into table t1 select * from orders_overwrite") sql("alter table t1 compact 'MAJOR'") assert(future.get.contains("PASS")) @@ -279,15 +290,13 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA } object Global { - var overwriteRunning = false + var loading = false } class WaitingDataMapFactory( carbonTable: CarbonTable, dataMapSchema: DataMapSchema) extends CoarseGrainDataMapFactory(carbonTable, dataMapSchema) { - private var identifier: AbsoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier - override def fireEvent(event: Event): Unit = ??? override def clear(segmentId: Segment): Unit = {} @@ -311,14 +320,14 @@ class WaitingDataMapFactory( override def onBlockStart(blockId: String): Unit = { // trigger the second SQL to execute - Global.overwriteRunning = true + Global.loading = true // wait for 1 second to let second SQL to finish Thread.sleep(1000) } override def finish(): Unit = { - + Global.loading = false } } } @@ -341,8 +350,8 @@ class WaitingDataMapFactory( false } - override def createRefresher(segment: Segment, - shardName: String): DataMapRefresher = { + override def createBuilder(segment: Segment, + shardName: String): DataMapBuilder = { ??? } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index fea3482..890f8fc 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -181,7 +181,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { protected val ON = carbonKeyWord("ON") protected val DMPROPERTIES = carbonKeyWord("DMPROPERTIES") protected val SELECT = carbonKeyWord("SELECT") - protected val REFRESH = carbonKeyWord("REFRESH") + protected val REBUILD = carbonKeyWord("REBUILD") + protected val DEFERRED = carbonKeyWord("DEFERRED") protected val doubleQuotedString = "\"([^\"]+)\"".r protected val singleQuotedString = "'([^']+)'".r http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java deleted file mode 100644 index 043acb1..0000000 --- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.datamap; - -import org.apache.carbondata.common.annotations.InterfaceAudience; - -/** - * Property that can be specified when creating DataMap - */ -@InterfaceAudience.Internal -public class DataMapProperty { - - /** - * Used to specify the store location of the datamap - */ - public static final String PARTITIONING = "partitioning"; - public static final String PATH = "path"; -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java index a5124a0..0642e01 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java @@ -105,7 +105,7 @@ public class IndexDataMapProvider extends DataMapProvider { @Override public void rebuild() { - IndexDataMapRefreshRDD.rebuildDataMap(sparkSession, getMainTable(), getDataMapSchema()); + IndexDataMapRebuildRDD.rebuildDataMap(sparkSession, getMainTable(), getDataMapSchema()); } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java index 746a361..37d49e5 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java @@ -17,11 +17,15 @@ package org.apache.carbondata.datamap; +import java.util.HashMap; +import java.util.Map; + import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; import org.apache.carbondata.core.datamap.DataMapCatalog; import org.apache.carbondata.core.datamap.DataMapProvider; import org.apache.carbondata.core.datamap.dev.DataMapFactory; +import org.apache.carbondata.core.metadata.schema.datamap.DataMapProperty; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; @@ -55,9 +59,11 @@ public class PreAggregateDataMapProvider extends DataMapProvider { private void validateDmProperty(DataMapSchema dataMapSchema) throws MalformedDataMapCommandException { if (!dataMapSchema.getProperties().isEmpty()) { - if (dataMapSchema.getProperties().size() > 2 || ( - !dataMapSchema.getProperties().containsKey(DataMapProperty.PATH) && - !dataMapSchema.getProperties().containsKey(DataMapProperty.PARTITIONING))) { + Map<String, String> properties = new HashMap<>(dataMapSchema.getProperties()); + properties.remove(DataMapProperty.DEFERRED_REBUILD); + properties.remove(DataMapProperty.PATH); + properties.remove(DataMapProperty.PARTITIONING); + if (properties.size() > 0) { throw new MalformedDataMapCommandException( "Only 'path' and 'partitioning' dmproperties are allowed for this datamap"); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala new file mode 100644 index 0000000..5902783 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap + +import java.io.{File, IOException} +import java.text.SimpleDateFormat +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType} +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.{CarbonInputMetrics, Partition, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.SparkSession + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} +import org.apache.carbondata.core.datamap.{DataMapRegistry, DataMapStoreManager, Segment} +import org.apache.carbondata.core.datamap.dev.DataMapBuilder +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, TableInfo} +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.TaskMetricsMap +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection, CarbonRecordReader} +import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport +import org.apache.carbondata.spark.{RefreshResult, RefreshResultImpl} +import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, CarbonSparkPartition} +import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl + +/** + * Helper object to rebuild the index DataMap + */ +object IndexDataMapRebuildRDD { + + /** + * Rebuild the datamap for all existing data in the table + */ + def rebuildDataMap( + sparkSession: SparkSession, + carbonTable: CarbonTable, + schema: DataMapSchema + ): Unit = { + val tableIdentifier = carbonTable.getAbsoluteTableIdentifier + val segmentStatusManager = new SegmentStatusManager(tableIdentifier) + val validAndInvalidSegments = segmentStatusManager.getValidAndInvalidSegments() + val validSegments = validAndInvalidSegments.getValidSegments + val indexedCarbonColumns = carbonTable.getIndexedColumns(schema) + + // loop all segments to rebuild DataMap + validSegments.asScala.foreach { segment => + // if lucene datamap folder is exists, not require to build lucene datamap again + refreshOneSegment(sparkSession, carbonTable, schema.getDataMapName, + indexedCarbonColumns, segment.getSegmentNo); + } + } + + private def refreshOneSegment( + sparkSession: SparkSession, + carbonTable: CarbonTable, + dataMapName: String, + indexColumns: java.util.List[CarbonColumn], + segmentId: String): Unit = { + + val dataMapStorePath = + CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) + + File.separator + + dataMapName + + if (!FileFactory.isFileExist(dataMapStorePath)) { + if (FileFactory.mkdirs(dataMapStorePath, FileFactory.getFileType(dataMapStorePath))) { + try { + val status = new IndexDataMapRebuildRDD[String, Boolean]( + sparkSession, + new RefreshResultImpl(), + carbonTable.getTableInfo, + dataMapName, + indexColumns.asScala.toArray, + segmentId + ).collect() + + status.find(_._2 == false).foreach { task => + throw new Exception( + s"Task Failed to rebuild datamap $dataMapName on segment_$segmentId") + } + } catch { + case ex: Throwable => + // process failure + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(dataMapStorePath)) + throw new Exception( + s"Failed to refresh datamap $dataMapName on segment_$segmentId", ex) + } + } else { + throw new IOException(s"Failed to create directory $dataMapStorePath") + } + } + } + +} + +class OriginalReadSupport(dataTypes: Array[DataType]) extends CarbonReadSupport[Array[Object]] { + override def initialize(carbonColumns: Array[CarbonColumn], + carbonTable: CarbonTable): Unit = { + } + + override def readRow(data: Array[Object]): Array[Object] = { + dataTypes.zipWithIndex.foreach { case (dataType, i) => + if (dataType == DataTypes.STRING) { + data(i) = data(i).toString + } + } + data + } + + override def close(): Unit = { + } +} + +class IndexDataMapRebuildRDD[K, V]( + session: SparkSession, + result: RefreshResult[K, V], + @transient tableInfo: TableInfo, + dataMapName: String, + indexColumns: Array[CarbonColumn], + segmentId: String +) extends CarbonRDDWithTableInfo[(K, V)]( + session.sparkContext, Nil, tableInfo.serialize()) { + + private val dataMapSchema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName) + private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "") + private val jobTrackerId: String = { + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + formatter.format(new util.Date()) + } + + override def internalCompute(split: Partition, context: TaskContext): Iterator[(K, V)] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + val dataMapFactory = + DataMapManager.get().getDataMapProvider( + CarbonTable.buildFromTableInfo(getTableInfo), dataMapSchema, session).getDataMapFactory + var status = false + val inputMetrics = new CarbonInputMetrics + TaskMetricsMap.getInstance().registerThreadCallback() + val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value + inputMetrics.initBytesReadCallback(context, inputSplit) + + val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) + val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId) + val format = createInputFormat(attemptContext) + + val model = format.createQueryModel(inputSplit, attemptContext) + // one query id per table + model.setQueryId(queryId) + model.setVectorReader(false) + model.setForcedDetailRawQuery(false) + model.setRequiredRowId(true) + + var reader: CarbonRecordReader[Array[Object]] = null + var refresher: DataMapBuilder = null + try { + reader = new CarbonRecordReader( + model, new OriginalReadSupport(indexColumns.map(_.getDataType)), inputMetrics) + reader.initialize(inputSplit, attemptContext) + + // we use task name as shard name to create the folder for this datamap + val shardName = CarbonTablePath.getShardName(inputSplit.getAllSplits.get(0).getBlockPath) + refresher = dataMapFactory.createBuilder(new Segment(segmentId), shardName) + refresher.initialize() + + var blockletId = 0 + var firstRow = true + while (reader.nextKeyValue()) { + val rowWithPosition = reader.getCurrentValue + val size = rowWithPosition.length + val pageId = rowWithPosition(size - 2).asInstanceOf[Int] + val rowId = rowWithPosition(size - 1).asInstanceOf[Int] + + if (!firstRow && pageId == 0 && rowId == 0) { + // new blocklet started, increase blockletId + blockletId = blockletId + 1 + } else { + firstRow = false + } + + refresher.addRow(blockletId, pageId, rowId, rowWithPosition) + } + + refresher.finish() + + status = true + } finally { + if (reader != null) { + try { + reader.close() + } catch { + case ex: Throwable => + LOGGER.error(ex, "Failed to close reader") + } + } + + if (refresher != null) { + try { + refresher.close() + } catch { + case ex: Throwable => + LOGGER.error(ex, "Failed to close index writer") + } + } + } + + new Iterator[(K, V)] { + + var finished = false + + override def hasNext: Boolean = { + !finished + } + + override def next(): (K, V) = { + finished = true + result.getKey(split.index.toString, status) + } + } + } + + + private def createInputFormat( + attemptContext: TaskAttemptContextImpl) = { + val format = new CarbonTableInputFormat[Object] + val tableInfo1 = getTableInfo + val conf = attemptContext.getConfiguration + CarbonInputFormat.setTableInfo(conf, tableInfo1) + CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName) + CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName) + CarbonInputFormat.setDataTypeConverter(conf, classOf[SparkDataTypeConverterImpl]) + + val identifier = tableInfo1.getOrCreateAbsoluteTableIdentifier() + CarbonInputFormat.setTablePath( + conf, + identifier.appendWithLocalPrefix(identifier.getTablePath)) + + CarbonInputFormat.setSegmentsToAccess( + conf, + Segment.toSegmentList(Array(segmentId), null)) + + CarbonInputFormat.setColumnProjection( + conf, + new CarbonProjection(indexColumns.map(_.getColName))) + format + } + + override protected def getPartitions = { + if (!dataMapSchema.isIndexDataMap) { + throw new UnsupportedOperationException + } + val conf = new Configuration() + val jobConf = new JobConf(conf) + SparkHadoopUtil.get.addCredentials(jobConf) + val job = Job.getInstance(jobConf) + job.getConfiguration.set("query.id", queryId) + + val format = new CarbonTableInputFormat[Object] + + CarbonInputFormat.setSegmentsToAccess( + job.getConfiguration, + Segment.toSegmentList(Array(segmentId), null)) + + CarbonInputFormat.setTableInfo( + job.getConfiguration, + tableInfo) + CarbonInputFormat.setTablePath( + job.getConfiguration, + tableInfo.getOrCreateAbsoluteTableIdentifier().getTablePath) + CarbonInputFormat.setDatabaseName( + job.getConfiguration, + tableInfo.getDatabaseName) + CarbonInputFormat.setTableName( + job.getConfiguration, + tableInfo.getFactTable.getTableName) + + format + .getSplits(job) + .asScala + .map(_.asInstanceOf[CarbonInputSplit]) + .groupBy(_.taskId) + .map { group => + new CarbonMultiBlockSplit( + group._2.asJava, + group._2.flatMap(_.getLocations).toArray) + } + .zipWithIndex + .map { split => + new CarbonSparkPartition(id, split._2, split._1) + } + .toArray + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRefreshRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRefreshRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRefreshRDD.scala deleted file mode 100644 index c341c36..0000000 --- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRefreshRDD.scala +++ /dev/null @@ -1,317 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.datamap - -import java.io.{File, IOException} -import java.text.SimpleDateFormat -import java.util - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType} -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.{CarbonInputMetrics, Partition, TaskContext} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.sql.SparkSession - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} -import org.apache.carbondata.core.datamap.dev.DataMapRefresher -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, TableInfo} -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn -import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.TaskMetricsMap -import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection, CarbonRecordReader} -import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} -import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport -import org.apache.carbondata.spark.{RefreshResult, RefreshResultImpl} -import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, CarbonSparkPartition} -import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl - -/** - * Helper object to rebuild the index DataMap - */ -object IndexDataMapRefreshRDD { - - /** - * Rebuild the datamap for all existing data in the table - */ - def rebuildDataMap( - sparkSession: SparkSession, - carbonTable: CarbonTable, - schema: DataMapSchema - ): Unit = { - val tableIdentifier = carbonTable.getAbsoluteTableIdentifier - val segmentStatusManager = new SegmentStatusManager(tableIdentifier) - val validAndInvalidSegments = segmentStatusManager.getValidAndInvalidSegments() - val validSegments = validAndInvalidSegments.getValidSegments - val indexedCarbonColumns = carbonTable.getIndexedColumns(schema) - - // loop all segments to rebuild DataMap - validSegments.asScala.foreach { segment => - // if lucene datamap folder is exists, not require to build lucene datamap again - refreshOneSegment(sparkSession, carbonTable, schema.getDataMapName, - indexedCarbonColumns, segment.getSegmentNo); - } - } - - private def refreshOneSegment( - sparkSession: SparkSession, - carbonTable: CarbonTable, - dataMapName: String, - indexColumns: java.util.List[CarbonColumn], - segmentId: String): Unit = { - - val dataMapStorePath = - CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) + - File.separator + - dataMapName - - if (!FileFactory.isFileExist(dataMapStorePath)) { - if (FileFactory.mkdirs(dataMapStorePath, FileFactory.getFileType(dataMapStorePath))) { - try { - val status = new IndexDataMapRefreshRDD[String, Boolean]( - sparkSession, - new RefreshResultImpl(), - carbonTable.getTableInfo, - dataMapName, - indexColumns.asScala.toArray, - segmentId - ).collect() - - status.find(_._2 == false).foreach { task => - throw new Exception( - s"Task Failed to refresh datamap $dataMapName on segment_$segmentId") - } - } catch { - case ex: Throwable => - // process failure - FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(dataMapStorePath)) - throw new Exception( - s"Failed to refresh datamap $dataMapName on segment_$segmentId", ex) - } - } else { - throw new IOException(s"Failed to create directory $dataMapStorePath") - } - } - } - -} - -class OriginalReadSupport(dataTypes: Array[DataType]) extends CarbonReadSupport[Array[Object]] { - override def initialize(carbonColumns: Array[CarbonColumn], - carbonTable: CarbonTable): Unit = { - } - - override def readRow(data: Array[Object]): Array[Object] = { - dataTypes.zipWithIndex.foreach { case (dataType, i) => - if (dataType == DataTypes.STRING) { - data(i) = data(i).toString - } - } - data - } - - override def close(): Unit = { - } -} - -class IndexDataMapRefreshRDD[K, V]( - session: SparkSession, - result: RefreshResult[K, V], - @transient tableInfo: TableInfo, - dataMapName: String, - indexColumns: Array[CarbonColumn], - segmentId: String -) extends CarbonRDDWithTableInfo[(K, V)]( - session.sparkContext, Nil, tableInfo.serialize()) { - - private val dataMapSchema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName) - private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "") - private val jobTrackerId: String = { - val formatter = new SimpleDateFormat("yyyyMMddHHmm") - formatter.format(new util.Date()) - } - - override def internalCompute(split: Partition, context: TaskContext): Iterator[(K, V)] = { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - val dataMapFactory = - DataMapManager.get().getDataMapProvider( - CarbonTable.buildFromTableInfo(getTableInfo), dataMapSchema, session).getDataMapFactory - var status = false - val inputMetrics = new CarbonInputMetrics - TaskMetricsMap.getInstance().registerThreadCallback() - val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value - inputMetrics.initBytesReadCallback(context, inputSplit) - - val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) - val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId) - val format = createInputFormat(attemptContext) - - val model = format.createQueryModel(inputSplit, attemptContext) - // one query id per table - model.setQueryId(queryId) - model.setVectorReader(false) - model.setForcedDetailRawQuery(false) - model.setRequiredRowId(true) - - var reader: CarbonRecordReader[Array[Object]] = null - var refresher: DataMapRefresher = null - try { - reader = new CarbonRecordReader( - model, new OriginalReadSupport(indexColumns.map(_.getDataType)), inputMetrics) - reader.initialize(inputSplit, attemptContext) - - // we use task name as shard name to create the folder for this datamap - val shardName = CarbonTablePath.getShardName(inputSplit.getAllSplits.get(0).getBlockPath) - refresher = dataMapFactory.createRefresher(new Segment(segmentId), shardName) - refresher.initialize() - - var blockletId = 0 - var firstRow = true - while (reader.nextKeyValue()) { - val rowWithPosition = reader.getCurrentValue - val size = rowWithPosition.length - val pageId = rowWithPosition(size - 2).asInstanceOf[Int] - val rowId = rowWithPosition(size - 1).asInstanceOf[Int] - - if (!firstRow && pageId == 0 && rowId == 0) { - // new blocklet started, increase blockletId - blockletId = blockletId + 1 - } else { - firstRow = false - } - - refresher.addRow(blockletId, pageId, rowId, rowWithPosition) - } - - refresher.finish() - - status = true - } finally { - if (reader != null) { - try { - reader.close() - } catch { - case ex: Throwable => - LOGGER.error(ex, "Failed to close reader") - } - } - - if (refresher != null) { - try { - refresher.close() - } catch { - case ex: Throwable => - LOGGER.error(ex, "Failed to close index writer") - } - } - } - - new Iterator[(K, V)] { - - var finished = false - - override def hasNext: Boolean = { - !finished - } - - override def next(): (K, V) = { - finished = true - result.getKey(split.index.toString, status) - } - } - } - - - private def createInputFormat( - attemptContext: TaskAttemptContextImpl) = { - val format = new CarbonTableInputFormat[Object] - val tableInfo1 = getTableInfo - val conf = attemptContext.getConfiguration - CarbonInputFormat.setTableInfo(conf, tableInfo1) - CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName) - CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName) - CarbonInputFormat.setDataTypeConverter(conf, classOf[SparkDataTypeConverterImpl]) - - val identifier = tableInfo1.getOrCreateAbsoluteTableIdentifier() - CarbonInputFormat.setTablePath( - conf, - identifier.appendWithLocalPrefix(identifier.getTablePath)) - - CarbonInputFormat.setSegmentsToAccess( - conf, - Segment.toSegmentList(Array(segmentId), null)) - - CarbonInputFormat.setColumnProjection( - conf, - new CarbonProjection(indexColumns.map(_.getColName))) - format - } - - override protected def getPartitions = { - if (!dataMapSchema.isIndexDataMap) { - throw new UnsupportedOperationException - } - val conf = new Configuration() - val jobConf = new JobConf(conf) - SparkHadoopUtil.get.addCredentials(jobConf) - val job = Job.getInstance(jobConf) - job.getConfiguration.set("query.id", queryId) - - val format = new CarbonTableInputFormat[Object] - - CarbonInputFormat.setSegmentsToAccess( - job.getConfiguration, - Segment.toSegmentList(Array(segmentId), null)) - - CarbonInputFormat.setTableInfo( - job.getConfiguration, - tableInfo) - CarbonInputFormat.setTablePath( - job.getConfiguration, - tableInfo.getOrCreateAbsoluteTableIdentifier().getTablePath) - CarbonInputFormat.setDatabaseName( - job.getConfiguration, - tableInfo.getDatabaseName) - CarbonInputFormat.setTableName( - job.getConfiguration, - tableInfo.getFactTable.getTableName) - - format - .getSplits(job) - .asScala - .map(_.asInstanceOf[CarbonInputSplit]) - .groupBy(_.taskId) - .map { group => - new CarbonMultiBlockSplit( - group._2.asJava, - group._2.flatMap(_.getLocations).toArray) - } - .zipWithIndex - .map { split => - new CarbonSparkPartition(id, split._2, split._1) - } - .toArray - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 42c9c25..bdbaef5 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -848,11 +848,11 @@ object CarbonDataRDDFactory { val errorMessage = s"Dataload failed due to failure in table status updation for" + s" ${carbonLoadModel.getTableName}" LOGGER.audit("Data load is failed for " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") + s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}") LOGGER.error("Dataload failed due to failure in table status updation.") throw new Exception(errorMessage) } else { - DataMapStatusManager.disableDataMapsOfTable(carbonTable) + DataMapStatusManager.disableAllLazyDataMaps(carbonTable) } done } http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index 5fac5a8..497f95a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.profiler.{Profiler, SQLStart} import org.apache.spark.util.{CarbonReflectionUtils, Utils} +import org.apache.carbondata.common.annotations.InterfaceAudience import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.scan.expression.LiteralExpression @@ -110,6 +111,16 @@ class CarbonSession(@transient val sc: SparkContext, ) } + /** + * Return true if the specified sql statement will hit the datamap + * This API is for test purpose only + */ + @InterfaceAudience.Developer(Array("DataMap")) + def isDataMapHit(sqlStatement: String, dataMapName: String): Boolean = { + val message = sql(s"EXPLAIN $sqlStatement").collect() + message(0).getString(0).contains(dataMapName) + } + def isSearchModeEnabled: Boolean = carbonStore != null /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index 34a4013..25589d4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution.command.datamap +import java.util + import scala.collection.JavaConverters._ import scala.collection.mutable @@ -27,7 +29,7 @@ import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandExcept import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager} import org.apache.carbondata.core.datamap.status.DataMapStatusManager -import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider +import org.apache.carbondata.core.metadata.schema.datamap.{DataMapClassProvider, DataMapProperty} import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapProvider} @@ -41,7 +43,8 @@ case class CarbonCreateDataMapCommand( dmProviderName: String, dmProperties: Map[String, String], queryString: Option[String], - ifNotExistsSet: Boolean = false) + ifNotExistsSet: Boolean = false, + deferredRebuild: Boolean = false) extends AtomicRunnableCommand { private var dataMapProvider: DataMapProvider = _ @@ -78,9 +81,16 @@ case class CarbonCreateDataMapCommand( } dataMapSchema = new DataMapSchema(dataMapName, dmProviderName) - dataMapSchema.setProperties(new java.util.HashMap[String, String]( - dmProperties.map(x => (x._1.trim, x._2.trim)).asJava)) + val property = dmProperties.map(x => (x._1.trim, x._2.trim)).asJava + val javaMap = new java.util.HashMap[String, String](property) + javaMap.put(DataMapProperty.DEFERRED_REBUILD, deferredRebuild.toString) + dataMapSchema.setProperties(javaMap) + + if (dataMapSchema.isIndexDataMap && mainTable == null) { + throw new MalformedDataMapCommandException( + "For this datamap, main table is required. Use `CREATE DATAMAP ... ON TABLE ...` ") + } dataMapProvider = DataMapManager.get.getDataMapProvider(mainTable, dataMapSchema, sparkSession) // If it is index datamap, check whether the column has datamap created already @@ -101,6 +111,10 @@ case class CarbonCreateDataMapCommand( dataMapProvider.initMeta(queryString.orNull) DataMapStatusManager.disableDataMap(dataMapName) case _ => + if (deferredRebuild) { + throw new MalformedDataMapCommandException( + "DEFERRED REBUILD is not supported on this DataMap") + } dataMapProvider.initMeta(queryString.orNull) } val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -111,10 +125,11 @@ case class CarbonCreateDataMapCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { if (dataMapProvider != null) { dataMapProvider.initData() - if (mainTable != null && - mainTable.isAutoRefreshDataMap && - !dataMapSchema.isIndexDataMap) { + if (mainTable != null && !deferredRebuild) { dataMapProvider.rebuild() + if (dataMapSchema.isIndexDataMap) { + DataMapStatusManager.enableDataMap(dataMapName) + } } } Seq.empty http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala new file mode 100644 index 0000000..6493c83 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.datamap + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand + +import org.apache.carbondata.core.datamap.{DataMapRegistry, DataMapStoreManager} +import org.apache.carbondata.core.datamap.status.DataMapStatusManager +import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapRebuildRDD} + +/** + * Rebuild the datamaps through sync with main table data. After sync with parent table's it enables + * the datamap. + */ +case class CarbonDataMapRebuildCommand( + dataMapName: String, + tableIdentifier: Option[TableIdentifier]) extends DataCommand { + + override def processData(sparkSession: SparkSession): Seq[Row] = { + val schema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName) + + val table = tableIdentifier match { + case Some(identifier) => + CarbonEnv.getCarbonTable(identifier)(sparkSession) + case _ => + CarbonEnv.getCarbonTable( + Option(schema.getRelationIdentifier.getDatabaseName), + schema.getRelationIdentifier.getTableName + )(sparkSession) + } + val provider = DataMapManager.get().getDataMapProvider(table, schema, sparkSession) + provider.rebuild() + + // After rebuild successfully enable the datamap. + DataMapStatusManager.enableDataMap(dataMapName) + Seq.empty + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala deleted file mode 100644 index 4f3b7bc..0000000 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.command.datamap - -import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.command.DataCommand - -import org.apache.carbondata.core.datamap.{DataMapRegistry, DataMapStoreManager} -import org.apache.carbondata.core.datamap.status.DataMapStatusManager -import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapRefreshRDD} - -/** - * Refresh the datamaps through sync with main table data. After sync with parent table's it enables - * the datamap. - */ -case class CarbonDataMapRefreshCommand( - dataMapName: String, - tableIdentifier: Option[TableIdentifier]) extends DataCommand { - - override def processData(sparkSession: SparkSession): Seq[Row] = { - val schema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName) - - val table = tableIdentifier match { - case Some(identifier) => - CarbonEnv.getCarbonTable(identifier)(sparkSession) - case _ => - CarbonEnv.getCarbonTable( - Option(schema.getRelationIdentifier.getDatabaseName), - schema.getRelationIdentifier.getTableName - )(sparkSession) - } - val provider = DataMapManager.get().getDataMapProvider(table, schema, sparkSession) - provider.rebuild() - - // After sync success enable the datamap. - DataMapStatusManager.enableDataMap(dataMapName) - Seq.empty - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala index 1bb7d7c..4f60297 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala @@ -231,8 +231,9 @@ class SparkCarbonFileFormat extends FileFormat val tab = model.getTable DataMapStoreManager.getInstance().clearDataMaps(identifier) - val dataMapExprWrapper = DataMapChooser.get - .choose(tab, model.getFilterExpressionResolverTree) + + val dataMapExprWrapper = new DataMapChooser(tab).choose( + model.getFilterExpressionResolverTree) // TODO : handle the partition for CarbonFileLevelFormat val prunedBlocklets = dataMapExprWrapper.prune(segments, null) http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 62da7ed..9dd8105 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier} import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.command.datamap.{CarbonCreateDataMapCommand, CarbonDataMapRefreshCommand, CarbonDataMapShowCommand, CarbonDropDataMapCommand} +import org.apache.spark.sql.execution.command.datamap.{CarbonCreateDataMapCommand, CarbonDataMapRebuildCommand, CarbonDataMapShowCommand, CarbonDropDataMapCommand} import org.apache.spark.sql.execution.command.management._ import org.apache.spark.sql.execution.command.partition.{CarbonAlterTableDropPartitionCommand, CarbonAlterTableSplitPartitionCommand} import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand} @@ -147,19 +147,23 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { /** * The syntax of datamap creation is as follows. - * CREATE DATAMAP IF NOT EXISTS datamapName ON TABLE tableName USING 'DataMapProviderName' + * CREATE DATAMAP IF NOT EXISTS datamapName [ON TABLE tableName] + * USING 'DataMapProviderName' + * [WITH DEFERRED REBUILD] * DMPROPERTIES('KEY'='VALUE') AS SELECT COUNT(COL1) FROM tableName */ protected lazy val createDataMap: Parser[LogicalPlan] = CREATE ~> DATAMAP ~> opt(IF ~> NOT ~> EXISTS) ~ ident ~ opt(ontable) ~ - (USING ~> stringLit) ~ (DMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~ + (USING ~> stringLit) ~ + opt(WITH ~> DEFERRED ~> REBUILD) ~ + (DMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~ (AS ~> restInput).? <~ opt(";") ^^ { - case ifnotexists ~ dmname ~ tableIdent ~ dmProviderName ~ dmprops ~ query => + case ifnotexists ~ dmname ~ tableIdent ~ dmProviderName ~ deferred ~ dmprops ~ query => val map = dmprops.getOrElse(List[(String, String)]()).toMap[String, String] CarbonCreateDataMapCommand(dmname, tableIdent, dmProviderName, map, query, - ifnotexists.isDefined) + ifnotexists.isDefined, deferred.isDefined) } protected lazy val ontable: Parser[TableIdentifier] = @@ -190,12 +194,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { /** * The syntax of show datamap is used to show datamaps on the table - * REFRESH DATAMAP datamapname [ON TABLE] tableName + * REBUILD DATAMAP datamapname [ON TABLE] tableName */ protected lazy val refreshDataMap: Parser[LogicalPlan] = - REFRESH ~> DATAMAP ~> ident ~ opt(ontable) <~ opt(";") ^^ { + REBUILD ~> DATAMAP ~> ident ~ opt(ontable) <~ opt(";") ^^ { case datamap ~ tableIdent => - CarbonDataMapRefreshCommand(datamap, tableIdent) + CarbonDataMapRebuildCommand(datamap, tableIdent) } protected lazy val deleteRecords: Parser[LogicalPlan] = http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala index a8e7b6c..7df3901 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala @@ -22,44 +22,56 @@ import java.util.UUID import scala.util.Random +import org.apache.spark.sql.{CarbonSession, DataFrame} import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.apache.carbondata.core.datamap.status.DataMapStatusManager import org.apache.carbondata.core.util.CarbonProperties -class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { - val inputFile = s"$resourcesPath/bloom_datamap_input.csv" +class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach { + val bigFile = s"$resourcesPath/bloom_datamap_input_big.csv" + val smallFile = s"$resourcesPath/bloom_datamap_input_small.csv" val normalTable = "carbon_normal" val bloomDMSampleTable = "carbon_bloom" val dataMapName = "bloom_dm" - val lineNum = 500000 override protected def beforeAll(): Unit = { new File(CarbonProperties.getInstance().getSystemFolderLocation).delete() - createFile(inputFile, line = lineNum, start = 0) + createFile(bigFile, line = 500000) + createFile(smallFile) sql(s"DROP TABLE IF EXISTS $normalTable") sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") } - private def checkQuery = { + override def afterEach(): Unit = { + sql(s"DROP TABLE IF EXISTS $normalTable") + sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") + } + + private def checkSqlHitDataMap(sqlText: String, dataMapName: String, shouldHit: Boolean): DataFrame = { + if (shouldHit) { + assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isDataMapHit(sqlText, dataMapName)) + } else { + assert(!sqlContext.sparkSession.asInstanceOf[CarbonSession].isDataMapHit(sqlText, dataMapName)) + } + sql(sqlText) + } + + private def checkQuery(dataMapName: String, shouldHit: Boolean = true) = { checkAnswer( - sql(s"select * from $bloomDMSampleTable where id = 1"), + checkSqlHitDataMap(s"select * from $bloomDMSampleTable where id = 1", dataMapName, shouldHit), sql(s"select * from $normalTable where id = 1")) checkAnswer( - sql(s"select * from $bloomDMSampleTable where id = 999"), + checkSqlHitDataMap(s"select * from $bloomDMSampleTable where id = 999", dataMapName, shouldHit), sql(s"select * from $normalTable where id = 999")) checkAnswer( - sql(s"select * from $bloomDMSampleTable where city = 'city_1'"), + checkSqlHitDataMap(s"select * from $bloomDMSampleTable where city = 'city_1'", dataMapName, shouldHit), sql(s"select * from $normalTable where city = 'city_1'")) checkAnswer( - sql(s"select * from $bloomDMSampleTable where city = 'city_999'"), + checkSqlHitDataMap(s"select * from $bloomDMSampleTable where city = 'city_999'", dataMapName, shouldHit), sql(s"select * from $normalTable where city = 'city_999'")) - checkAnswer( - sql(s"select count(distinct id), count(distinct name), count(distinct city)," + - s" count(distinct s1), count(distinct s2) from $bloomDMSampleTable"), - sql(s"select count(distinct id), count(distinct name), count(distinct city)," + - s" count(distinct s1), count(distinct s2) from $normalTable")) - checkAnswer( + checkAnswer( sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" + s" from $bloomDMSampleTable"), sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" + @@ -86,28 +98,34 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000') """.stripMargin) + var map = DataMapStatusManager.readDataMapStatusMap() + assert(map.get(dataMapName).isEnabled) + // load two segments (1 to 2).foreach { i => sql( s""" - | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $normalTable + | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $normalTable | OPTIONS('header'='false') """.stripMargin) sql( s""" - | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $bloomDMSampleTable + | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $bloomDMSampleTable | OPTIONS('header'='false') """.stripMargin) } + map = DataMapStatusManager.readDataMapStatusMap() + assert(map.get(dataMapName).isEnabled) + sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false) checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, dataMapName) - checkQuery + checkQuery(dataMapName) sql(s"DROP TABLE IF EXISTS $normalTable") sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") } - test("test create bloom datamap and refresh datamap") { + test("test create bloom datamap and REBUILD DATAMAP") { sql( s""" | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT, @@ -125,12 +143,12 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { (1 to 2).foreach { i => sql( s""" - | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $normalTable + | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $normalTable | OPTIONS('header'='false') """.stripMargin) sql( s""" - | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $bloomDMSampleTable + | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $bloomDMSampleTable | OPTIONS('header'='false') """.stripMargin) } @@ -142,18 +160,135 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000') """.stripMargin) - sql(s"REFRESH DATAMAP $dataMapName ON TABLE $bloomDMSampleTable") sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false) checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, dataMapName) - checkQuery + checkQuery(dataMapName) + sql(s"DROP TABLE IF EXISTS $normalTable") + sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") + } + + test("test create bloom datamap with DEFERRED REBUILD, query hit datamap") { + sql( + s""" + | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT, + | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING) + | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128') + | """.stripMargin) + sql( + s""" + | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, age INT, + | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING) + | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128') + | """.stripMargin) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $normalTable + | OPTIONS('header'='false') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $bloomDMSampleTable + | OPTIONS('header'='false') + """.stripMargin) + + sql( + s""" + | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable + | USING 'bloomfilter' + | WITH DEFERRED REBUILD + | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000') + """.stripMargin) + + var map = DataMapStatusManager.readDataMapStatusMap() + assert(!map.get(dataMapName).isEnabled) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $normalTable + | OPTIONS('header'='false') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $bloomDMSampleTable + | OPTIONS('header'='false') + """.stripMargin) + + map = DataMapStatusManager.readDataMapStatusMap() + assert(!map.get(dataMapName).isEnabled) + + // once we rebuild, it should be enabled + sql(s"REBUILD DATAMAP $dataMapName ON TABLE $bloomDMSampleTable") + map = DataMapStatusManager.readDataMapStatusMap() + assert(map.get(dataMapName).isEnabled) + + sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false) + checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, dataMapName) + checkQuery(dataMapName) + + // once we load again, datamap should be disabled, since it is lazy + sql( + s""" + | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $normalTable + | OPTIONS('header'='false') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $bloomDMSampleTable + | OPTIONS('header'='false') + """.stripMargin) + map = DataMapStatusManager.readDataMapStatusMap() + assert(!map.get(dataMapName).isEnabled) + checkQuery(dataMapName, shouldHit = false) + sql(s"DROP TABLE IF EXISTS $normalTable") sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") } - // todo: will add more tests on bloom datamap, such as exception, delete datamap, show profiler + test("test create bloom datamap with DEFERRED REBUILD, query not hit datamap") { + sql( + s""" + | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT, + | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING) + | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128') + | """.stripMargin) + sql( + s""" + | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, age INT, + | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING) + | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128') + | """.stripMargin) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $normalTable + | OPTIONS('header'='false') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE $bloomDMSampleTable + | OPTIONS('header'='false') + """.stripMargin) + + sql( + s""" + | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable + | USING 'bloomfilter' + | WITH DEFERRED REBUILD + | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000') + """.stripMargin) + + checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, dataMapName) + + // datamap is not loaded, so it should not hit + checkQuery(dataMapName, shouldHit = false) + sql(s"DROP TABLE IF EXISTS $normalTable") + sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") + } override protected def afterAll(): Unit = { - deleteFile(inputFile) + deleteFile(bigFile) + deleteFile(smallFile) sql(s"DROP TABLE IF EXISTS $normalTable") sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index 9c3d5d6..3dc34d3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -61,8 +61,12 @@ public class DataMapWriterListener { } if (tableIndices != null) { for (TableDataMap tableDataMap : tableIndices) { - DataMapFactory factory = tableDataMap.getDataMapFactory(); - register(factory, segmentId, taskNo); + // register it only if it is not lazy datamap, for lazy datamap, user + // will rebuild the datamap manually + if (!tableDataMap.getDataMapSchema().isLazy()) { + DataMapFactory factory = tableDataMap.getDataMapFactory(); + register(factory, segmentId, taskNo); + } } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---------------------------------------------------------------------- diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java index 957e9f8..35acb17 100644 --- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java +++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java @@ -21,12 +21,11 @@ import java.io.IOException; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Objects; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datamap.DataMapChooser; -import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; import org.apache.carbondata.core.datastore.block.TableBlockInfo; @@ -104,8 +103,10 @@ public class SearchRequestHandler { LOG.info(String.format("[SearchId:%d] %s, number of block: %d", request.searchId(), queryModel.toString(), mbSplit.getAllSplits().size())); - // If there is FGDataMap, prune the split by applying FGDataMap - queryModel = tryPruneByFGDataMap(request.searchId(), table, queryModel, mbSplit); + // If there is DataMap selected in Master, prune the split by it + if (request.dataMap() != null) { + queryModel = prune(request.searchId(), table, queryModel, mbSplit, request.dataMap().get()); + } // In search mode, reader will read multiple blocks by using a thread pool CarbonRecordReader<CarbonRow> reader = @@ -135,35 +136,32 @@ public class SearchRequestHandler { * If there is FGDataMap defined for this table and filter condition in the query, * prune the splits by the DataMap and set the pruned split into the QueryModel and return */ - private QueryModel tryPruneByFGDataMap(int queryId, - CarbonTable table, QueryModel queryModel, CarbonMultiBlockSplit mbSplit) throws IOException { - DataMapExprWrapper wrapper = - DataMapChooser.get().choose(table, queryModel.getFilterExpressionResolverTree()); - - if (wrapper.getDataMapLevel() == DataMapLevel.FG) { - List<Segment> segments = new LinkedList<>(); - for (CarbonInputSplit split : mbSplit.getAllSplits()) { - segments.add(Segment.toSegment( - split.getSegmentId(), new LatestFilesReadCommittedScope(table.getTablePath()))); - } - List<ExtendedBlocklet> prunnedBlocklets = wrapper.prune(segments, null); + private QueryModel prune(int queryId, CarbonTable table, QueryModel queryModel, + CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws IOException { + Objects.requireNonNull(datamap); + List<Segment> segments = new LinkedList<>(); + for (CarbonInputSplit split : mbSplit.getAllSplits()) { + segments.add( + Segment.toSegment(split.getSegmentId(), + new LatestFilesReadCommittedScope(table.getTablePath()))); + } + List<ExtendedBlocklet> prunnedBlocklets = datamap.prune(segments, null); - List<String> pathToRead = new LinkedList<>(); - for (ExtendedBlocklet prunnedBlocklet : prunnedBlocklets) { - pathToRead.add(prunnedBlocklet.getPath()); - } + List<String> pathToRead = new LinkedList<>(); + for (ExtendedBlocklet prunnedBlocklet : prunnedBlocklets) { + pathToRead.add(prunnedBlocklet.getPath()); + } - List<TableBlockInfo> blocks = queryModel.getTableBlockInfos(); - List<TableBlockInfo> blockToRead = new LinkedList<>(); - for (TableBlockInfo block : blocks) { - if (pathToRead.contains(block.getFilePath())) { - blockToRead.add(block); - } + List<TableBlockInfo> blocks = queryModel.getTableBlockInfos(); + List<TableBlockInfo> blockToRead = new LinkedList<>(); + for (TableBlockInfo block : blocks) { + if (pathToRead.contains(block.getFilePath())) { + blockToRead.add(block); } - LOG.info(String.format("[SearchId:%d] pruned using FG DataMap, pruned blocks: %d", - queryId, blockToRead.size())); - queryModel.setTableBlockInfos(blockToRead); } + LOG.info(String.format("[SearchId:%d] pruned using FG DataMap, pruned blocks: %d", queryId, + blockToRead.size())); + queryModel.setTableBlockInfos(blockToRead); return queryModel; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/store/search/src/main/scala/org/apache/spark/rpc/Master.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala index 5b31a49..2e9a532 100644 --- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala +++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala @@ -38,12 +38,15 @@ import org.apache.spark.util.ThreadUtils import org.apache.carbondata.common.annotations.InterfaceAudience import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datamap.DataMapChooser +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper import org.apache.carbondata.core.datastore.block.Distributable import org.apache.carbondata.core.datastore.row.CarbonRow import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.hadoop.CarbonMultiBlockSplit +import org.apache.carbondata.hadoop.api.CarbonInputFormat import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.store.worker.Status @@ -212,11 +215,13 @@ class Master(sparkConf: SparkConf) { // prune data and get a mapping of worker hostname to list of blocks, // then add these blocks to the SearchRequest and fire the RPC call val nodeBlockMapping: JMap[String, JList[Distributable]] = pruneBlock(table, columns, filter) + val fgDataMap = chooseFGDataMap(table, filter) val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) => // Build a SearchRequest val split = new SerializableWritable[CarbonMultiBlockSplit]( new CarbonMultiBlockSplit(blocks, splitAddress)) - val request = SearchRequest(queryId, split, table.getTableInfo, columns, filter, localLimit) + val request = + SearchRequest(queryId, split, table.getTableInfo, columns, filter, localLimit, fgDataMap) // Find an Endpoind and send the request to it // This RPC is non-blocking so that we do not need to wait before send to next worker @@ -249,6 +254,14 @@ class Master(sparkConf: SparkConf) { output.toArray } + private def chooseFGDataMap( + table: CarbonTable, + filter: Expression): Option[DataMapExprWrapper] = { + val chooser = new DataMapChooser(table) + val filterInterface = table.resolveFilter(filter) + Option(chooser.chooseFGDataMap(filterInterface)) + } + /** * Prune data by using CarbonInputFormat.getSplit * Return a mapping of host address to list of block @@ -261,6 +274,9 @@ class Master(sparkConf: SparkConf) { val job = new Job(jobConf) val format = CarbonInputFormatUtil.createCarbonTableInputFormat( job, table, columns, filter, null, null) + + // We will do FG pruning in reader side, so don't do it here + CarbonInputFormat.setFgDataMapPruning(job.getConfiguration, false) val splits = format.getSplits(job) val distributables = splits.asScala.map { split => split.asInstanceOf[Distributable] http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/store/search/src/main/scala/org/apache/spark/search/Searcher.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala index e467fd3..1532284 100644 --- a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala +++ b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala @@ -21,6 +21,7 @@ import org.apache.spark.SerializableWritable import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv} import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.hadoop.CarbonMultiBlockSplit @@ -37,11 +38,11 @@ class Searcher(override val rpcEnv: RpcEnv) extends RpcEndpoint { } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case req@SearchRequest(_, _, _, _, _, _) => + case req: SearchRequest => val response = new SearchRequestHandler().handleSearch(req) context.reply(response) - case req@ShutdownRequest(_) => + case req: ShutdownRequest => val response = new SearchRequestHandler().handleShutdown(req) context.reply(response) @@ -59,7 +60,8 @@ case class SearchRequest( tableInfo: TableInfo, projectColumns: Array[String], filterExpression: Expression, - limit: Long) + limit: Long, + dataMap: Option[DataMapExprWrapper]) // Search result sent from worker to master case class SearchResult(